Home NovaAstrax 360 Build a Hybrid-Memory Autonomous Agent with Modular Architecture and Tool Dispatch Using...

    Build a Hybrid-Memory Autonomous Agent with Modular Architecture and Tool Dispatch Using OpenAI

    3
    0


    In this tutorial, we begin by exploring the architecture behind a hybrid-memory autonomous agent. This system combines semantic vector search, keyword-based retrieval, and a modular tool-dispatching loop to create an agent capable of reasoning, remembering, and acting autonomously. We walk through each layer of the design from the ground up, starting with abstract interfaces that enforce clean separation of concerns, all the way to a live agent that manages its own long-term memory.

    Copy CodeCopiedUse a different Browser
    !pip install openai numpy rank_bm25 --quiet
    
    
    import os, json, math, re, time, getpass
    from abc import ABC, abstractmethod
    from dataclasses import dataclass, field
    from typing import Any, Callable, Dict, List, Optional, Tuple
    
    
    import numpy as np
    from rank_bm25 import BM25Okapi
    from openai import OpenAI
    
    
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or getpass.getpass("  Enter your OpenAI API key (hidden): ")
    client = OpenAI(api_key=OPENAI_API_KEY)
    
    
    EMBED_MODEL = "text-embedding-3-small"
    CHAT_MODEL  = "gpt-4o-mini"
    
    
    print("✅  OpenAI client ready.")

    We kick things off by installing all required dependencies and configuring our Python environment with the necessary imports. We securely collect the OpenAI API key using getpass, ensuring the key is never echoed to the terminal or notebook output. We also define the two global constants, the embedding model and the chat model, that every subsequent snippet depends on.

    Copy CodeCopiedUse a different Browser
    class MemoryBackend(ABC):
       @abstractmethod
       def store(self, text: str, metadata: Dict[str, Any]) -> str: ...
       @abstractmethod
       def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: ...
       @abstractmethod
       def list_all(self) -> List[Dict[str, Any]]: ...
    
    
    class LLMProvider(ABC):
       @abstractmethod
       def complete(self, messages: List[Dict], tools: Optional[List] = None) -> Dict: ...
    
    
    class Tool(ABC):
       name: str
       description: str
    
    
       @abstractmethod
       def run(self, **kwargs) -> str: ...
    
    
       def schema(self) -> Dict:
           return {
               "type": "function",
               "function": {
                   "name": self.name,
                   "description": self.description,
                   "parameters": {"type": "object", "properties": {}, "required": []},
               },
           }
    
    
    
    
    @dataclass
    class MemoryChunk:
       id: str
       text: str
       metadata: Dict[str, Any]
       embedding: Optional[np.ndarray] = field(default=None, repr=False)
    
    
    
    
    def _embed(texts: List[str]) -> List[np.ndarray]:
       resp = client.embeddings.create(model=EMBED_MODEL, input=texts)
       vecs = [np.array(d.embedding, dtype=np.float32) for d in resp.data]
       return [v / (np.linalg.norm(v) + 1e-10) for v in vecs]
    
    
    
    
    def _tokenise(text: str) -> List[str]:
       return re.sub(r"[^a-z0-9\s]", "", text.lower()).split()
    
    
    
    
    class HybridMemory(MemoryBackend):
       RRF_K = 60
    
    
       def __init__(self):
           self._chunks: List[MemoryChunk] = []
           self._bm25: Optional[BM25Okapi] = None
           self._counter = 0
    
    
       def store(self, text: str, metadata: Dict[str, Any] | None = None) -> str:
           metadata = metadata or {}
           self._counter += 1
           chunk_id = f"mem_{self._counter:04d}"
           [vec] = _embed([text])
           chunk = MemoryChunk(id=chunk_id, text=text, metadata=metadata, embedding=vec)
           self._chunks.append(chunk)
           corpus = [_tokenise(c.text) for c in self._chunks]
           self._bm25 = BM25Okapi(corpus)
           print(f"   💾  Stored [{chunk_id}]: {text[:60]}…" if len(text) > 60 else f"   💾  Stored [{chunk_id}]: {text}")
           return chunk_id
    
    
       def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
           if not self._chunks:
               return []
           n = len(self._chunks)
           top_k = min(top_k, n)
    
    
           [q_vec] = _embed([query])
           cos_scores = np.array([np.dot(q_vec, c.embedding) for c in self._chunks])
           vec_ranks = {self._chunks[i].id: rank + 1 for rank, i in enumerate(np.argsort(-cos_scores))}
    
    
           bm25_scores = self._bm25.get_scores(_tokenise(query))
           kw_ranks = {self._chunks[i].id: rank + 1 for rank, i in enumerate(np.argsort(-bm25_scores))}
    
    
           rrf: Dict[str, float] = {}
           for chunk in self._chunks:
               cid = chunk.id
               rrf[cid] = (1.0 / (self.RRF_K + vec_ranks.get(cid, n + 1)) +
                           1.0 / (self.RRF_K + kw_ranks.get(cid, n + 1)))
    
    
           ranked_ids = sorted(rrf, key=lambda x: rrf[x], reverse=True)[:top_k]
           results = []
           ids = [c.id for c in self._chunks]
           for cid in ranked_ids:
               chunk = next(c for c in self._chunks if c.id == cid)
               results.append({
                   "id": chunk.id,
                   "text": chunk.text,
                   "metadata": chunk.metadata,
                   "rrf_score": round(rrf[cid], 6),
                   "cosine": round(float(cos_scores[ids.index(cid)]), 4),
                   "bm25": round(float(bm25_scores[ids.index(cid)]), 4),
               })
           return results
    
    
       def list_all(self) -> List[Dict[str, Any]]:
           return [{"id": c.id, "text": c.text, "metadata": c.metadata} for c in self._chunks]
    
    
    
    
    class OpenAIProvider(LLMProvider):
       def __init__(self, model: str = CHAT_MODEL, temperature: float = 0.2):
           self.model = model
           self.temperature = temperature
    
    
       def complete(self, messages: List[Dict], tools: Optional[List] = None) -> Dict:
           kwargs: Dict[str, Any] = dict(model=self.model, messages=messages, temperature=self.temperature)
           if tools:
               kwargs["tools"] = tools
               kwargs["tool_choice"] = "auto"
           response = client.chat.completions.create(**kwargs)
           msg = response.choices[0].message
           result: Dict[str, Any] = {"role": "assistant", "content": msg.content or ""}
           if msg.tool_calls:
               result["tool_calls"] = [
                   {
                       "id": tc.id,
                       "type": "function",
                       "function": {"name": tc.function.name, "arguments": tc.function.arguments},
                   }
                   for tc in msg.tool_calls
               ]
           return result
    
    
    
    
    print("✅  Interfaces, HybridMemory, and OpenAIProvider ready.")

    We define the three core abstract base classes, MemoryBackend, LLMProvider, and Tool, that serve as the interface contracts every concrete component must honour. We then implement HybridMemory, which stores embeddings for vector search and maintains a live BM25 index for keyword matching, merging both result sets using Reciprocal Rank Fusion. We close the snippet with OpenAIProvider, a concrete LLMProvider that normalises the OpenAI response into a provider-agnostic dictionary the agent can consume without knowing which model sits underneath.

    Copy CodeCopiedUse a different Browser
    class MemoryStoreTool(Tool):
       name = "memory_store"
       description = "Save an important fact or piece of information to long-term memory."
    
    
       def __init__(self, memory: MemoryBackend):
           self._mem = memory
    
    
       def run(self, text: str, category: str = "general") -> str:
           chunk_id = self._mem.store(text, {"category": category})
           return f"Stored as {chunk_id}."
    
    
       def schema(self) -> Dict:
           return {
               "type": "function",
               "function": {
                   "name": self.name,
                   "description": self.description,
                   "parameters": {
                       "type": "object",
                       "properties": {
                           "text":     {"type": "string", "description": "The fact to remember."},
                           "category": {"type": "string", "description": "Category tag, e.g. 'user_pref', 'task', 'fact'."},
                       },
                       "required": ["text"],
                   },
               },
           }
    
    
    
    
    class MemorySearchTool(Tool):
       name = "memory_search"
       description = "Search long-term memory for information relevant to a query."
    
    
       def __init__(self, memory: MemoryBackend):
           self._mem = memory
    
    
       def run(self, query: str, top_k: int = 3) -> str:
           results = self._mem.search(query, top_k=top_k)
           if not results:
               return "No relevant memories found."
           lines = [f"[{r['id']}] (score={r['rrf_score']}) {r['text']}" for r in results]
           return "Relevant memories:\n" + "\n".join(lines)
    
    
       def schema(self) -> Dict:
           return {
               "type": "function",
               "function": {
                   "name": self.name,
                   "description": self.description,
                   "parameters": {
                       "type": "object",
                       "properties": {
                           "query": {"type": "string", "description": "What to look for."},
                           "top_k": {"type": "integer", "description": "Max results (default 3)."},
                       },
                       "required": ["query"],
                   },
               },
           }
    
    
    
    
    class CalculatorTool(Tool):
       name = "calculator"
       description = "Evaluate a safe mathematical expression, e.g. '2 ** 10 + sqrt(144)'."
    
    
       def run(self, expression: str) -> str:
           allowed = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
           allowed.update({"abs": abs, "round": round})
           try:
               result = eval(expression, {"__builtins__": {}}, allowed)
               return str(result)
           except Exception as exc:
               return f"Error: {exc}"
    
    
       def schema(self) -> Dict:
           return {
               "type": "function",
               "function": {
                   "name": self.name,
                   "description": self.description,
                   "parameters": {
                       "type": "object",
                       "properties": {
                           "expression": {"type": "string", "description": "Math expression to evaluate."},
                       },
                       "required": ["expression"],
                   },
               },
           }
    
    
    
    
    class WebSnippetTool(Tool):
       name = "web_search"
       description = "Search the web for current information on a topic (simulated)."
    
    
       _KB = {
           "openai": "OpenAI is an AI safety company that develops the GPT family of models.",
           "rag": "Retrieval-Augmented Generation (RAG) combines a retrieval system with an LLM to ground answers in external documents.",
           "bm25": "BM25 (Best Match 25) is a probabilistic keyword ranking function used in search engines.",
       }
    
    
       def run(self, query: str) -> str:
           q = query.lower()
           for kw, snippet in self._KB.items():
               if kw in q:
                   return f"Web snippet for '{query}': {snippet}"
           return f"No snippet found for '{query}'. (Mock tool — integrate a real search API here.)"
    
    
       def schema(self) -> Dict:
           return {
               "type": "function",
               "function": {
                   "name": self.name,
                   "description": self.description,
                   "parameters": {
                       "type": "object",
                       "properties": {
                           "query": {"type": "string", "description": "Search query."},
                       },
                       "required": ["query"],
                   },
               },
           }
    
    
    
    
    @dataclass
    class AgentPersona:
       name: str
       role: str
       traits: List[str]
       forbidden_phrases: List[str] = field(default_factory=list)
       goals: List[str] = field(default_factory=list)
    
    
       def compile_system_prompt(self, extra_context: str = "") -> str:
           lines = [
               f"You are {self.name}, {self.role}.",
               "",
               "## Core Traits",
               *[f"- {t}" for t in self.traits],
           ]
           if self.goals:
               lines += ["", "## Goals", *[f"- {g}" for g in self.goals]]
           if self.forbidden_phrases:
               lines += ["", "## Forbidden Phrases (never say these)", *[f"- \"{p}\"" for p in self.forbidden_phrases]]
           if extra_context:
               lines += ["", "## Live Context", extra_context]
           lines += [
               "",
               "## Behaviour",
               "- Always reason step-by-step before answering.",
               "- Use available tools proactively; never guess when you can look up.",
               "- After using memory_search, quote the retrieved ID in your answer.",
               "- Keep answers concise unless depth is explicitly requested.",
           ]
           return "\n".join(lines)
    
    
    
    
    ARIA = AgentPersona(
       name="Aria",
       role="a precise, helpful research assistant with a hybrid memory system",
       traits=["Methodical", "Curious", "Transparent about uncertainty", "Concise"],
       goals=[
           "Remember and connect information across conversations",
           "Use tools whenever they can improve accuracy",
       ],
       forbidden_phrases=["I cannot", "As an AI language model"],
    )
    
    
    print("✅  Tools and AgentPersona ready.")

    We implement four tools, MemoryStoreTool, MemorySearchTool, CalculatorTool, and WebSnippetTool, each implementing the Tool interface and exposing an OpenAI-compatible JSON schema for automatic function invocation. We then introduce AgentPersona, a data class that compiles traits, goals, and forbidden phrases into a fully deterministic system prompt at runtime. We instantiate our demo persona, Aria, whose compiled prompt is injected at the top of every conversation turn to ensure consistent identity across all interactions.

    Copy CodeCopiedUse a different Browser
    class AutonomousAgent:
       MAX_TOOL_ROUNDS = 8
    
    
       def __init__(self, persona: AgentPersona, llm: LLMProvider, memory: MemoryBackend, tools: List[Tool]):
           self.persona  = persona
           self._llm     = llm
           self._memory  = memory
           self._tools   = {t.name: t for t in tools}
           self._history: List[Dict] = []
    
    
       def chat(self, user_message: str, verbose: bool = True) -> str:
           if verbose:
               print(f"\n{'═'*60}")
               print(f"👤  USER: {user_message}")
               print(f"{'═'*60}")
    
    
           memory_context = self._build_memory_context(user_message)
           system_prompt  = self.persona.compile_system_prompt(memory_context)
    
    
           messages = [{"role": "system", "content": system_prompt}]
           messages += self._history
           messages.append({"role": "user", "content": user_message})
    
    
           tool_schemas = [t.schema() for t in self._tools.values()]
    
    
           for round_num in range(self.MAX_TOOL_ROUNDS):
               reply = self._llm.complete(messages, tools=tool_schemas if tool_schemas else None)
    
    
               if "tool_calls" not in reply:
                   final_text = reply["content"]
                   if verbose:
                       print(f"\n🤖  ARIA: {final_text}")
                   self._history.append({"role": "user",      "content": user_message})
                   self._history.append({"role": "assistant", "content": final_text})
                   return final_text
    
    
               messages.append(reply)
    
    
               for tc in reply["tool_calls"]:
                   tool_name = tc["function"]["name"]
                   try:
                       args = json.loads(tc["function"]["arguments"])
                   except json.JSONDecodeError:
                       args = {}
    
    
                   if verbose:
                       print(f"\n🔧  TOOL CALL → {tool_name}({args})")
    
    
                   result = self._tools[tool_name].run(**args) if tool_name in self._tools else f"Error: unknown tool '{tool_name}'."
    
    
                   if verbose:
                       print(f"   ↳  RESULT: {result}")
    
    
                   messages.append({"role": "tool", "tool_call_id": tc["id"], "content": result})
    
    
           return "[Agent reached tool round limit — please rephrase your request.]"
    
    
       def register_tool(self, tool: Tool) -> None:
           self._tools[tool.name] = tool
           print(f"   🔌  Tool registered: {tool.name}")
    
    
       def list_tools(self) -> List[str]:
           return list(self._tools.keys())
    
    
       def memory_dump(self) -> List[Dict]:
           return self._memory.list_all()
    
    
       def clear_history(self) -> None:
           self._history.clear()
    
    
       def _build_memory_context(self, query: str) -> str:
           results = self._memory.search(query, top_k=3)
           if not results:
               return ""
           snippets = "\n".join(f"- [{r['id']}] {r['text']}" for r in results)
           return f"Recalled memories related to this query:\n{snippets}"
    
    
    
    
    memory = HybridMemory()
    llm    = OpenAIProvider(model=CHAT_MODEL)
    tools  = [
       MemoryStoreTool(memory),
       MemorySearchTool(memory),
       CalculatorTool(),
       WebSnippetTool(),
    ]
    
    
    agent = AutonomousAgent(persona=ARIA, llm=llm, memory=memory, tools=tools)
    
    
    print(f"✅  Agent '{ARIA.name}' bootstrapped with tools: {agent.list_tools()}")

    We build the AutonomousAgent class, which owns the agentic loop, repeatedly sending messages to the LLM, detecting tool calls, dispatching them to the correct tool, and feeding results back until a plain-text reply is produced. We wire together all prior components, HybridMemory, OpenAIProvider, the four tools, and the Aria persona, into a single bootstrapped agent instance ready to receive user messages. We also expose utility methods, such as register_tool for runtime hot-swapping and memory_dump for inspecting the full state of long-term memory.

    Copy CodeCopiedUse a different Browser
    print("\n" + "═"*60)
    print("DEMO 1 — Pre-seeding long-term memory")
    print("═"*60)
    
    
    facts = [
       ("Alice's favourite programming language is Rust.", {"category": "user_pref"}),
       ("Alice is working on a distributed key-value store called 'VelocityDB'.", {"category": "task"}),
       ("VelocityDB uses the Raft consensus algorithm for replication.", {"category": "fact"}),
       ("Alice has a meeting with the infrastructure team on Friday at 2 PM.", {"category": "calendar"}),
       ("The project deadline for VelocityDB v1.0 is March 31.", {"category": "task"}),
       ("Alice prefers concise answers without unnecessary preamble.", {"category": "user_pref"}),
       ("Order #4821 was placed by Alice for 32 GB of DDR5 RAM modules.", {"category": "order"}),
    ]
    
    
    for text, meta in facts:
       memory.store(text, meta)
    
    
    
    
    print("\n" + "═"*60)
    print("DEMO 2 — Hybrid Memory Search Showdown")
    print("═"*60)
    
    
    test_queries = [
       "What consensus algorithm does VelocityDB use?",
       "order 4821",
       "Alice's language preference",
    ]
    
    
    for q in test_queries:
       print(f"\n🔍  Query: '{q}'")
       results = memory.search(q, top_k=2)
       for r in results:
           print(f"   [{r['id']}] cosine={r['cosine']:.3f}  bm25={r['bm25']:.2f}  rrf={r['rrf_score']:.5f}")
           print(f"        → {r['text']}")
    
    
    
    
    print("\n" + "═"*60)
    print("DEMO 3 — Autonomous Agent Conversations")
    print("═"*60)
    
    
    agent.chat("What do you know about Alice's project? What's the deadline and which algorithm does it rely on?")
    agent.chat("Can you find the details on order number 4821?")
    agent.chat(
       "There are 22 working days until March 31. "
       "If Alice works 6.5 hours per day on VelocityDB, "
       "how many total hours does she have left?"
    )
    agent.chat(
       "Alice just decided to switch the storage engine of VelocityDB from LSM-tree to B-tree. "
       "Please remember this decision."
    )
    agent.chat("What storage engine decision did Alice make for VelocityDB?")
    
    
    
    
    print("\n" + "═"*60)
    print("DEMO 4 — Runtime Tool Hot-Swap (vtable pattern)")
    print("═"*60)
    
    
    class UpgradedWebSnippetTool(WebSnippetTool):
       _KB = {
           **WebSnippetTool._KB,
           "lsm-tree": "An LSM-tree (Log-Structured Merge-tree) optimises write throughput at the cost of read amplification.",
       }
    
    
    agent.register_tool(UpgradedWebSnippetTool())
    agent.chat(
       "Can you search the web for a brief explanation of B-tree storage engines "
       "and tell me if it's a good match for Alice's project?"
    )
    
    
    
    
    print("\n" + "═"*60)
    print("FINAL — Full Memory Dump")
    print("═"*60)
    
    
    for chunk in agent.memory_dump():
       print(f"  [{chunk['id']}] ({chunk['metadata'].get('category','?')}) {chunk['text']}")

    We run four progressive demo scenarios that exercise every layer of the architecture we have built: seeding long-term memory with structured facts, running direct hybrid search queries to observe how vector and BM25 scores combine, conducting a multi-turn autonomous conversation where the agent recalls, computes, and stores information on its own, and finally hot-swapping a tool at runtime to demonstrate the vtable pattern in action. We close by dumping the full memory state to verify that all autonomously stored decisions have been correctly persisted. We end with an architecture recap table that maps every component back to the pattern it implements.

    In conclusion, we have walked through the complete construction of a hybrid-memory autonomous agent, from abstract interface contracts and dual-path retrieval all the way to a self-directing agent loop that stores, recalls, and reasons over information without any hard-coded logic. We have seen how the modular design allows any component, the memory backend, the language model provider, or individual tools,  to be swapped or extended at runtime with zero changes to the agent core. This property makes the architecture genuinely production-ready.


    Check out the Full Codes with Notebook here. Also, feel free to follow us on Twitter and don’t forget to join our 150k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

    Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us

    The post Build a Hybrid-Memory Autonomous Agent with Modular Architecture and Tool Dispatch Using OpenAI appeared first on MarkTechPost.

    LEAVE A REPLY

    Please enter your comment!
    Please enter your name here