๐ค Ghostwritten by Claude Opus 4.5 ยท Curated by Tom Hundley
This article was written by Claude Opus 4.5 and curated for publication by Tom Hundley.
The difference between a chatbot and an assistant is memory.
In Part 1 of this series, we built a RAG system that retrieves documents and generates grounded answers. In Part 2, we added basic conversational history. But there is a fundamental problem: every conversation starts fresh.
Consider this interaction:
User: "What is the vacation policy?"
Assistant: "You receive 20 days of PTO annually..."
[User closes browser, returns 3 days later]
User: "Can I carry those vacation days over to next year?"
Assistant: "I'm sorry, I don't have context about what vacation days
you're referring to. Could you please clarify?"The user reasonably expected continuity. They got amnesia.
This is the stateless RAG problem. Your retrieval system has all the knowledge, but no memory of who asked what, when, or why. Every session is a blank slate. For simple Q&A, this is fine. For AI assistants, customer support bots, or any application where relationships develop over time, it is a critical limitation.
Memory transforms RAG from a search interface into an intelligent assistant. Here is what memory enables:
# Without memory
User: "What's the refund policy?"
Bot: "Returns accepted within 30 days..."
User: "What about electronics?"
Bot: "Could you clarify your question?"
# With memory
User: "What's the refund policy?"
Bot: "Returns accepted within 30 days..."
User: "What about electronics?"
Bot: "Electronics have a 15-day return window instead of 30 days..."# Memory tracks user preferences
User: "Show me the documentation for Python"
# Memory stores: user prefers Python
...
User: "How do I authenticate?"
# System retrieves Python authentication docs (not Java, not Go)# Memory builds context across sessions
Session 1: User asks about API rate limits
Session 2: User asks about webhooks
Session 3: User asks about error handling
# Memory recognizes: user is building an integration
# System can proactively mention the integration guideLong-term memory allows your AI to remember:
Memory adds complexity and cost. Not every RAG system needs it.
| Scenario | Reasoning |
|---|---|
| One-shot Q&A | Users ask isolated questions, no follow-ups |
| Public knowledge base | Anonymous users, no personalization needed |
| High-volume, low-touch | Support deflection where context rarely matters |
| Strict privacy requirements | Cannot store user conversation data |
| Cost-sensitive deployment | Every memory operation has infrastructure cost |
| Scenario | Reasoning |
|---|---|
| Multi-turn conversations | Users ask follow-up questions referencing prior turns |
| Authenticated users | Known users who return across sessions |
| Complex workflows | Multi-step processes spanning multiple interactions |
| Customer relationships | Support agents who need context on user history |
| Personalized experience | Adapting responses based on user preferences |
Rule of thumb: If users regularly ask "as I mentioned earlier" or "following up on my last question," you need memory.
Memory in conversational RAG is not separate from retrieval. It is another retrieval source. Your system now has two knowledge bases:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CONVERSATIONAL RAG WITH MEMORY โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโ โ
โ โ User Query โ โ
โ โโโโโโโโฌโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโดโโโโโโโโโโโโ โ
โ โผ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ MEMORY STORE โ โ KNOWLEDGE BASE โ โ
โ โ โ โ โ โ
โ โ - Chat history โ โ - Documents โ โ
โ โ - User facts โ โ - Vector store โ โ
โ โ - Preferences โ โ - Metadata โ โ
โ โโโโโโโโโโฌโโโโโโโโโ โโโโโโโโโโฌโโโโโโโโโ โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ CONTEXT ASSEMBLY โ โ
โ โ Merge memory + retrieved documents โ โ
โ โโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ LLM + RAG โ โ
โ โ Generation โ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโThe key insight: memory retrieval follows the same patterns as document retrieval. You can use the same vector search, the same relevance scoring, the same context window management.
Short-term memory maintains context within a single conversation session. These are the building blocks.
The simplest approach: store every message.
from langchain.memory import ConversationBufferMemory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnablePassthrough
# Initialize buffer memory
memory = ConversationBufferMemory(
return_messages=True,
memory_key="chat_history"
)
# RAG prompt with memory
prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant with access to a knowledge base.
Use the following context to answer questions:
{context}
Consider the conversation history when interpreting the user's question.
Resolve pronouns and references based on prior messages."""),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}")
])
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def run_conversation_turn(question: str, retriever, memory) -> str:
"""Execute one turn of a RAG conversation with memory."""
# 1. Load memory into context
memory_vars = memory.load_memory_variables({})
chat_history = memory_vars.get("chat_history", [])
# 2. Retrieve relevant documents
# Consider using conversation context to enhance retrieval
docs = retriever.invoke(question)
context = "\n\n".join(doc.page_content for doc in docs)
# 3. Generate response
chain = prompt | llm
response = chain.invoke({
"context": context,
"chat_history": chat_history,
"question": question
})
# 4. Save the interaction to memory
memory.save_context(
{"input": question},
{"output": response.content}
)
return response.content
# Usage
response1 = run_conversation_turn("What is the vacation policy?", retriever, memory)
print(response1)
response2 = run_conversation_turn("Can I carry them over?", retriever, memory)
print(response2) # Understands "them" refers to vacation daysPros:
Cons:
Use when: Conversations are short (under 10 turns) or context window is large.
Keep only the last N exchanges.
from langchain.memory import ConversationBufferWindowMemory
# Keep last 5 exchanges (10 messages: 5 human + 5 AI)
memory = ConversationBufferWindowMemory(
k=5,
return_messages=True,
memory_key="chat_history"
)
# Simulate a long conversation
exchanges = [
("What products do you sell?", "We sell software and services..."),
("Tell me about the software", "Our software includes..."),
("What about pricing?", "Our pricing model is..."),
("Do you offer discounts?", "Yes, we offer volume discounts..."),
("What's the refund policy?", "Refunds are available within 30 days..."),
("Can I get a demo?", "Yes, demos are available..."),
]
for human, ai in exchanges:
memory.save_context({"input": human}, {"output": ai})
# Check what's retained
print(memory.load_memory_variables({}))
# Only shows last 5 exchanges - "What products do you sell?" is gonePros:
Cons:
Use when: Recent context is most important and early messages can be discarded.
Use the LLM to compress history into a summary.
from langchain.memory import ConversationSummaryMemory
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Memory that summarizes as it grows
memory = ConversationSummaryMemory(
llm=llm,
return_messages=True,
memory_key="chat_history"
)
# Long conversation about a complex topic
exchanges = [
("I need help setting up SSO for my organization",
"I'll help with SSO setup. First, which identity provider do you use?"),
("We use Okta",
"Great, Okta integrates well. You'll need to configure a SAML app..."),
("I created the SAML app, now what?",
"Now configure the assertion consumer service URL..."),
("Done. Where do I add the metadata?",
"In your admin dashboard, go to Settings > SSO > Add Provider..."),
("I'm getting an error: 'Invalid signature'",
"That usually means the certificate wasn't uploaded correctly..."),
]
for human, ai in exchanges:
memory.save_context({"input": human}, {"output": ai})
# Memory now contains a summary, not full history
print(memory.load_memory_variables({}))
# Output: "The user is setting up SSO with Okta. They've created the SAML app
# and configured the assertion consumer service. Currently troubleshooting
# an 'Invalid signature' error related to certificate upload."Pros:
Cons:
Use when: Conversations are long but the gist matters more than exact words.
Combine summary for old messages with buffer for recent ones.
from langchain.memory import ConversationSummaryBufferMemory
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Keep recent messages in buffer, summarize older ones
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=500, # Summarize when buffer exceeds this
return_messages=True,
memory_key="chat_history"
)
# Build up a conversation
for i in range(10):
memory.save_context(
{"input": f"Question {i}: What about feature {i}?"},
{"output": f"Feature {i} works like this..."}
)
# Memory now contains:
# - Summary of exchanges 0-7
# - Full messages for exchanges 8-9 (recent)
print(memory.load_memory_variables({}))Pros:
Cons:
Use when: You need both recent precision and long-term context.
For production systems, manage context window explicitly:
import tiktoken
from typing import List, Dict
def truncate_messages_to_token_limit(
messages: List[Dict[str, str]],
max_tokens: int = 4000,
model: str = "gpt-4o"
) -> List[Dict[str, str]]:
"""
Truncate message history to fit within token limit.
Preserves:
1. System message (always)
2. Most recent messages (priority)
3. Oldest message if space permits (for context)
Args:
messages: List of {"role": str, "content": str} dicts
max_tokens: Maximum tokens for conversation history
model: Model name for tokenizer selection
Returns:
Truncated message list fitting within token limit
"""
encoding = tiktoken.encoding_for_model(model)
def count_tokens(msgs):
return sum(len(encoding.encode(m["content"])) + 4 for m in msgs)
# Always keep system message if present
system_msgs = [m for m in messages if m["role"] == "system"]
conversation_msgs = [m for m in messages if m["role"] != "system"]
if not conversation_msgs:
return system_msgs
system_tokens = count_tokens(system_msgs)
available_tokens = max_tokens - system_tokens
# Start with most recent messages
result = []
for msg in reversed(conversation_msgs):
test_result = [msg] + result
if count_tokens(test_result) <= available_tokens:
result = test_result
else:
break
return system_msgs + result
# Usage
messages = [
{"role": "system", "content": "You are a helpful assistant..."},
{"role": "user", "content": "First question..."},
{"role": "assistant", "content": "First answer..."},
# ... many more messages ...
{"role": "user", "content": "Latest question..."},
]
truncated = truncate_messages_to_token_limit(messages, max_tokens=2000)Before retrieval, rewrite the query to include conversation context:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
def rewrite_query_with_context(
current_query: str,
chat_history: List[Dict[str, str]],
llm: ChatOpenAI
) -> str:
"""
Rewrite a query to be standalone by incorporating conversation context.
Transforms:
History: "What is the vacation policy?" -> "You get 20 days..."
Query: "Can I carry them over?"
Into:
"Can I carry unused vacation days over to the next year?"
"""
if not chat_history:
return current_query
rewrite_prompt = ChatPromptTemplate.from_messages([
("system", """Given the following conversation and a follow-up question,
rephrase the follow-up question to be a standalone question that includes
all necessary context from the conversation.
Do NOT answer the question, just rephrase it.
Chat History:
{chat_history}"""),
("human", "Follow-up question: {question}\n\nStandalone question:")
])
# Format chat history
history_str = "\n".join([
f"{msg['role'].title()}: {msg['content']}"
for msg in chat_history
])
chain = rewrite_prompt | llm
response = chain.invoke({
"chat_history": history_str,
"question": current_query
})
return response.content.strip()
# Example
history = [
{"role": "user", "content": "What is the vacation policy?"},
{"role": "assistant", "content": "Employees receive 20 days of PTO annually."}
]
rewritten = rewrite_query_with_context(
"Can I carry them over?",
history,
ChatOpenAI(model="gpt-4o-mini")
)
print(rewritten)
# Output: "Can I carry unused vacation days or PTO over to the next year?"This rewritten query produces much better retrieval results than the original pronoun-heavy version.
Short-term memory handles individual sessions. Long-term memory persists across sessions, building user models and retaining important information indefinitely.
Long-term memory is harder than it sounds:
Several specialized systems have emerged to address these challenges.
Zep is a dedicated memory layer for LLM applications. It extracts facts, builds knowledge graphs, and provides semantic search over memories.
from zep_cloud.client import Zep
from zep_cloud import Message
# Initialize Zep client
zep = Zep(api_key="your-zep-api-key")
# Create a user
user_id = "user_alice"
zep.user.add(user_id=user_id, email="alice@example.com")
# Create a session (conversation)
session_id = "session_abc123"
zep.memory.add_session(session_id=session_id, user_id=user_id)
async def add_memory_and_query(session_id: str, messages: list):
"""
Add messages to Zep memory and query for relevant context.
Zep automatically:
- Extracts entities and facts
- Builds a knowledge graph of relationships
- Provides semantic search over memory
- Summarizes long conversations
"""
# Add messages to memory
zep_messages = [
Message(role=msg["role"], content=msg["content"])
for msg in messages
]
await zep.memory.add(session_id=session_id, messages=zep_messages)
# Get memory context for a query
memory = await zep.memory.get(
session_id=session_id,
min_rating=0.5 # Only retrieve high-relevance memories
)
return memory
# Zep also provides knowledge graph queries
async def get_user_knowledge(user_id: str, query: str):
"""
Query the user's knowledge graph.
Zep builds a graph of entities and relationships mentioned
in conversations. You can query this graph to understand
what the user has discussed over time.
"""
results = await zep.graph.search(
user_id=user_id,
query=query,
limit=10
)
return results
# Example: Find what we know about the user's projects
projects = await get_user_knowledge("user_alice", "current projects")Zep Architecture:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ZEP MEMORY SERVER โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ INGESTION โโโโโโถโ EXTRACTION โโโโโโถโ KNOWLEDGE GRAPH โ โ
โ โ โ โ โ โ โ โ
โ โ - Messages โ โ - Entity NER โ โ - Nodes (entities) โ โ
โ โ - Documents โ โ - Fact extractionโ โ - Edges (relations) โ โ
โ โ - Metadata โ โ - Summarization โ โ - Temporal data โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ RETRIEVAL LAYER โ โ
โ โ โ โ
โ โ Semantic Search โ Graph Queries โ Temporal Filtering โ Summaries โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโKey Features:
When to use Zep:
MemGPT takes a different approach: it gives the LLM control over its own memory through a hierarchical system inspired by operating system memory management.
from letta import create_client
# MemGPT (now called Letta) client
client = create_client()
# Create an agent with memory management
agent = client.create_agent(
name="memory_agent",
memory={
"human": "The user's name is Alice. She works in data science.",
"persona": "You are a helpful AI assistant with persistent memory."
},
llm_config={
"model": "gpt-4o",
"context_window": 8000
}
)
def chat_with_memory(agent_id: str, message: str):
"""
Chat with a MemGPT agent that manages its own memory.
MemGPT implements:
1. Main context: What's currently in the prompt
2. Archival memory: Long-term storage in a vector database
3. Recall memory: Conversation history searchable by date/content
The agent decides what to:
- Keep in main context
- Archive for later retrieval
- Search when needed
"""
response = client.send_message(
agent_id=agent_id,
message=message,
role="user"
)
return response
# The agent manages memory automatically
# Turn 1
response = chat_with_memory(agent.id, "I'm working on a machine learning project")
# Agent might archive: "User has a machine learning project (as of December 2025)"
# Turn 50
response = chat_with_memory(agent.id, "How's my project going?")
# Agent searches archival memory, finds the ML project referenceMemGPT Architecture:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MemGPT MEMORY HIERARCHY โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ MAIN CONTEXT (In-Context Memory) โ โ
โ โ Limited by context window โ โ
โ โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Core Memory โ โ Recent Messages โ โ Active Facts โ โ โ
โ โ โ (Persona + โ โ (Last N turns) โ โ (Current topic) โ โ โ
โ โ โ Human info) โ โ โ โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโ โ
โ โผ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ RECALL MEMORY โ โ ARCHIVAL MEMORY โ โ
โ โ (Conversation Store) โ โ (Long-term Vector DB) โ โ
โ โ โ โ โ โ
โ โ - Searchable by date โ โ - Unlimited capacity โ โ
โ โ - Full conversation log โ โ - Semantic search โ โ
โ โ - Agent can scroll back โ โ - Agent-managed storage โ โ
โ โ โ โ - Explicit save/retrieve โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ LLM FUNCTION CALLS FOR MEMORY: โ
โ โข archival_memory_insert(content) - Save to long-term memory โ
โ โข archival_memory_search(query) - Retrieve from long-term memory โ
โ โข conversation_search(query) - Search conversation history โ
โ โข core_memory_replace(key, value) - Update working memory โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโKey Innovation: The LLM has function-calling tools for memory management. It decides what to archive, when to retrieve, and how to organize information. This mirrors how humans actively manage attention and memory.
When to use MemGPT:
LangMem integrates with LangGraph to add memory capabilities to stateful agent workflows.
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState
# Memory store for facts and user information
store = InMemoryStore()
# Checkpointer for conversation state
checkpointer = MemorySaver()
# Define the graph state
class AgentState(MessagesState):
user_facts: list[str]
current_topic: str
def extract_facts(state: AgentState) -> dict:
"""
Extract memorable facts from the conversation.
Uses an LLM to identify information worth persisting:
- User preferences
- Important statements
- Goals and projects
"""
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
extraction_prompt = """Review this conversation and extract facts
worth remembering about the user. Return as a JSON list of strings.
Messages: {messages}
Previously known facts: {known_facts}
New facts (JSON list):"""
response = llm.invoke(extraction_prompt.format(
messages=state["messages"][-5:], # Last 5 messages
known_facts=state.get("user_facts", [])
))
# Parse and merge facts
new_facts = json.loads(response.content)
all_facts = list(set(state.get("user_facts", []) + new_facts))
return {"user_facts": all_facts}
def query_with_memory(state: AgentState) -> dict:
"""
Answer query using conversation + stored facts.
"""
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
facts_context = "\n".join(f"- {fact}" for fact in state.get("user_facts", []))
system_prompt = f"""You are a helpful assistant with memory of this user.
Known facts about the user:
{facts_context}
Use these facts to personalize your response when relevant."""
messages = [{"role": "system", "content": system_prompt}] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
# Build the graph
workflow = StateGraph(AgentState)
workflow.add_node("extract_facts", extract_facts)
workflow.add_node("respond", query_with_memory)
workflow.add_edge("extract_facts", "respond")
workflow.set_entry_point("extract_facts")
workflow.set_finish_point("respond")
# Compile with memory
app = workflow.compile(checkpointer=checkpointer, store=store)
# Run with thread ID for persistence
config = {"configurable": {"thread_id": "user_123"}}
# First session
result = app.invoke(
{"messages": [{"role": "user", "content": "I'm a Python developer working on ML"}]},
config
)
# Later session (facts persist)
result = app.invoke(
{"messages": [{"role": "user", "content": "Can you help me with my code?"}]},
config
)
# Agent remembers: Python developer, ML focusWhen to use LangMem/LangGraph:
Sometimes you need full control. Here is a production pattern for custom long-term memory:
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Pinecone
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import json
class ConversationalMemory:
"""
Production-grade conversational memory system.
Features:
- Semantic search over past interactions
- User-scoped memory (multi-tenant)
- Time-weighted relevance
- Fact extraction and storage
- Memory decay and cleanup
"""
def __init__(
self,
index_name: str,
embeddings: OpenAIEmbeddings,
namespace_prefix: str = "memory"
):
self.vectorstore = Pinecone.from_existing_index(
index_name=index_name,
embedding=embeddings,
namespace=namespace_prefix
)
self.embeddings = embeddings
self.namespace_prefix = namespace_prefix
def _get_user_namespace(self, user_id: str) -> str:
"""Namespace per user for multi-tenant isolation."""
return f"{self.namespace_prefix}_{user_id}"
def store_interaction(
self,
user_id: str,
messages: List[Dict[str, str]],
metadata: Optional[Dict] = None
) -> str:
"""
Store a conversation turn in memory.
Args:
user_id: Unique user identifier
messages: List of messages in this turn
metadata: Additional context (topic, intent, etc.)
Returns:
Memory ID for reference
"""
# Create a summary of the interaction
interaction_text = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in messages
])
doc_metadata = {
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat(),
"message_count": len(messages),
"type": "interaction",
**(metadata or {})
}
# Store in user-specific namespace
ids = self.vectorstore.add_texts(
texts=[interaction_text],
metadatas=[doc_metadata],
namespace=self._get_user_namespace(user_id)
)
return ids[0]
def store_fact(
self,
user_id: str,
fact: str,
confidence: float = 1.0,
source: str = "extracted"
) -> str:
"""
Store a fact about the user.
Args:
user_id: User identifier
fact: The fact to store ("User prefers Python")
confidence: How confident we are (0-1)
source: Where this fact came from
Returns:
Fact ID
"""
metadata = {
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat(),
"type": "fact",
"confidence": confidence,
"source": source
}
ids = self.vectorstore.add_texts(
texts=[fact],
metadatas=[metadata],
namespace=self._get_user_namespace(user_id)
)
return ids[0]
def retrieve_memories(
self,
user_id: str,
query: str,
k: int = 5,
time_weight: float = 0.1,
min_score: float = 0.5
) -> List[Dict]:
"""
Retrieve relevant memories for a query.
Args:
user_id: User identifier
query: Current query to match against memories
k: Number of memories to retrieve
time_weight: How much to weight recency (0 = ignore, 1 = heavy)
min_score: Minimum similarity score to include
Returns:
List of relevant memories with scores
"""
# Retrieve more than needed to allow for filtering
results = self.vectorstore.similarity_search_with_score(
query,
k=k * 2,
namespace=self._get_user_namespace(user_id)
)
# Apply time weighting
now = datetime.utcnow()
weighted_results = []
for doc, base_score in results:
if base_score < min_score:
continue
# Parse timestamp
timestamp = datetime.fromisoformat(doc.metadata["timestamp"])
days_old = (now - timestamp).days
# Apply exponential decay to older memories
time_factor = 1.0 / (1 + time_weight * days_old)
final_score = base_score * time_factor
weighted_results.append({
"content": doc.page_content,
"metadata": doc.metadata,
"base_score": base_score,
"final_score": final_score
})
# Sort by final score and return top k
weighted_results.sort(key=lambda x: x["final_score"], reverse=True)
return weighted_results[:k]
def get_user_facts(self, user_id: str, limit: int = 20) -> List[str]:
"""Get all stored facts about a user."""
# Query for fact-type documents
results = self.vectorstore.similarity_search(
"user facts preferences information",
k=limit,
namespace=self._get_user_namespace(user_id),
filter={"type": "fact"}
)
return [doc.page_content for doc in results]
def cleanup_old_memories(
self,
user_id: str,
days_old: int = 90,
keep_facts: bool = True
):
"""
Remove memories older than threshold.
Facts can optionally be preserved even if old.
"""
# Implementation depends on vector store capabilities
# Pinecone supports metadata filtering for deletion
cutoff = datetime.utcnow() - timedelta(days=days_old)
filter_dict = {
"timestamp": {"$lt": cutoff.isoformat()}
}
if keep_facts:
filter_dict["type"] = {"$ne": "fact"}
# Note: Actual deletion API varies by vector store
# This is a conceptual example
pass
# Usage
memory = ConversationalMemory(
index_name="user-memories",
embeddings=OpenAIEmbeddings()
)
# Store an interaction
memory.store_interaction(
user_id="alice_123",
messages=[
{"role": "user", "content": "I'm looking for help with Python async"},
{"role": "assistant", "content": "I'll help with async Python..."}
],
metadata={"topic": "python", "intent": "learning"}
)
# Store a fact
memory.store_fact(
user_id="alice_123",
fact="User is learning Python async/await patterns",
confidence=0.9,
source="conversation_extraction"
)
# Later: retrieve relevant memories
memories = memory.retrieve_memories(
user_id="alice_123",
query="help with concurrent programming",
k=5
)The most powerful pattern combines conversational memory with document retrieval.
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from typing import List, Dict
class MemoryAugmentedRAG:
"""
RAG system that combines:
1. Document retrieval (knowledge base)
2. Memory retrieval (conversation history + user facts)
3. Query rewriting with context
"""
def __init__(
self,
document_retriever,
memory_store: ConversationalMemory,
llm: ChatOpenAI
):
self.document_retriever = document_retriever
self.memory_store = memory_store
self.llm = llm
def _rewrite_query(
self,
query: str,
memories: List[Dict],
chat_history: List[Dict]
) -> str:
"""Rewrite query incorporating memory and history context."""
if not memories and not chat_history:
return query
memory_context = "\n".join([
f"- {m['content']}" for m in memories[:3]
])
history_context = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in chat_history[-4:]
])
prompt = ChatPromptTemplate.from_template("""
Given this context, rewrite the query to be more specific and standalone.
Relevant memories about this user:
{memory_context}
Recent conversation:
{history_context}
Original query: {query}
Rewritten query (be specific, resolve pronouns, incorporate relevant context):""")
chain = prompt | self.llm
result = chain.invoke({
"memory_context": memory_context or "None",
"history_context": history_context or "None",
"query": query
})
return result.content.strip()
def query(
self,
user_id: str,
query: str,
chat_history: List[Dict] = None
) -> Dict:
"""
Execute a memory-augmented RAG query.
Steps:
1. Retrieve relevant memories
2. Rewrite query with context
3. Retrieve documents
4. Generate response with all context
5. Store the interaction in memory
"""
chat_history = chat_history or []
# 1. Retrieve memories
memories = self.memory_store.retrieve_memories(
user_id=user_id,
query=query,
k=5
)
user_facts = self.memory_store.get_user_facts(user_id, limit=10)
# 2. Rewrite query with context
enhanced_query = self._rewrite_query(query, memories, chat_history)
# 3. Retrieve documents
documents = self.document_retriever.invoke(enhanced_query)
doc_context = "\n\n---\n\n".join([
doc.page_content for doc in documents
])
# 4. Generate response
facts_str = "\n".join(f"- {fact}" for fact in user_facts)
memory_str = "\n".join(f"- {m['content'][:200]}" for m in memories[:3])
generation_prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant with access to documentation and memory of past interactions.
KNOWLEDGE BASE DOCUMENTS:
{documents}
USER FACTS (what you know about this user):
{user_facts}
RELEVANT PAST INTERACTIONS:
{memories}
Use the documentation to answer factually. Use memories and user facts to personalize.
If you don't know something, say so rather than making it up."""),
("human", "{query}")
])
chain = generation_prompt | self.llm
response = chain.invoke({
"documents": doc_context,
"user_facts": facts_str or "None stored yet",
"memories": memory_str or "None relevant",
"query": query
})
# 5. Store this interaction
self.memory_store.store_interaction(
user_id=user_id,
messages=[
{"role": "user", "content": query},
{"role": "assistant", "content": response.content}
]
)
return {
"answer": response.content,
"sources": documents,
"memories_used": memories,
"enhanced_query": enhanced_query
}Not every query needs memory. Here is a routing pattern:
from enum import Enum
class QueryType(Enum):
KNOWLEDGE = "knowledge" # Pure knowledge base query
MEMORY = "memory" # Pure memory query
HYBRID = "hybrid" # Needs both
CLARIFICATION = "clarification" # Needs previous turn only
def classify_query(
query: str,
chat_history: List[Dict],
llm: ChatOpenAI
) -> QueryType:
"""
Classify a query to determine retrieval strategy.
Examples:
- "What is the refund policy?" -> KNOWLEDGE
- "What did I ask about last week?" -> MEMORY
- "Tell me more about the refund policy I asked about" -> HYBRID
- "What do you mean?" -> CLARIFICATION
"""
prompt = ChatPromptTemplate.from_template("""
Classify this query into one of these categories:
KNOWLEDGE: Query about factual information, documentation, policies, features
MEMORY: Query about past interactions, user history, "what did I", "remember when"
HYBRID: Query that references both past interactions AND needs factual knowledge
CLARIFICATION: Query that only needs the immediate prior message to understand
Query: {query}
Recent history (for context):
{history}
Category (respond with exactly one word: KNOWLEDGE, MEMORY, HYBRID, or CLARIFICATION):""")
history_str = "\n".join([
f"{m['role']}: {m['content'][:100]}"
for m in (chat_history or [])[-3:]
])
chain = prompt | llm
result = chain.invoke({
"query": query,
"history": history_str or "None"
})
category = result.content.strip().upper()
try:
return QueryType[category]
except KeyError:
return QueryType.HYBRID # Default to hybrid if unclear
# Use in routing
def route_query(query: str, chat_history: List[Dict], llm: ChatOpenAI):
query_type = classify_query(query, chat_history, llm)
if query_type == QueryType.KNOWLEDGE:
# Only search knowledge base
return {"search_docs": True, "search_memory": False}
elif query_type == QueryType.MEMORY:
# Only search memory
return {"search_docs": False, "search_memory": True}
elif query_type == QueryType.CLARIFICATION:
# Only use recent chat history
return {"search_docs": False, "search_memory": False, "use_history": True}
else: # HYBRID
return {"search_docs": True, "search_memory": True}| Storage | Use Case | Pros | Cons |
|---|---|---|---|
| Redis | Session memory, short-term | Fast, built-in TTL | No semantic search |
| PostgreSQL + pgvector | Full memory system | Transactional, familiar | Self-managed |
| Pinecone/Weaviate | Long-term vector memory | Managed, semantic search | Cost at scale |
| Zep | Complete memory solution | Purpose-built, knowledge graphs | Another service |
| SQLite | Local/dev memory | Simple, portable | Not scalable |
import redis
import json
from datetime import timedelta
class RedisSessionMemory:
"""Fast session memory with automatic expiration."""
def __init__(self, redis_url: str, ttl_hours: int = 24):
self.redis = redis.from_url(redis_url)
self.ttl = timedelta(hours=ttl_hours)
def _key(self, session_id: str) -> str:
return f"memory:session:{session_id}"
def add_message(self, session_id: str, role: str, content: str):
"""Add a message to session memory."""
key = self._key(session_id)
message = {"role": role, "content": content}
self.redis.rpush(key, json.dumps(message))
self.redis.expire(key, self.ttl)
def get_history(self, session_id: str, limit: int = 50) -> List[Dict]:
"""Get recent messages for a session."""
key = self._key(session_id)
# Get last N messages
messages = self.redis.lrange(key, -limit, -1)
return [json.loads(m) for m in messages]
def clear_session(self, session_id: str):
"""Clear a session's memory."""
self.redis.delete(self._key(session_id))Critical for production: users must never see each other's memories.
class TenantIsolatedMemory:
"""Memory system with strict tenant isolation."""
def __init__(self, vectorstore_factory):
self.vectorstore_factory = vectorstore_factory
self._stores = {} # Cache of per-tenant stores
def _get_store(self, tenant_id: str):
"""Get or create a tenant-specific store."""
if tenant_id not in self._stores:
# Each tenant gets their own namespace/collection
self._stores[tenant_id] = self.vectorstore_factory(
namespace=f"tenant_{tenant_id}"
)
return self._stores[tenant_id]
def store(self, tenant_id: str, user_id: str, content: str, metadata: dict):
"""Store with tenant + user isolation."""
store = self._get_store(tenant_id)
# Double-tag with tenant and user
full_metadata = {
**metadata,
"tenant_id": tenant_id,
"user_id": user_id
}
store.add_texts([content], [full_metadata])
def retrieve(self, tenant_id: str, user_id: str, query: str, k: int = 5):
"""Retrieve with strict tenant + user filtering."""
store = self._get_store(tenant_id)
# Filter ensures isolation
results = store.similarity_search(
query,
k=k,
filter={
"tenant_id": tenant_id,
"user_id": user_id
}
)
return resultsMemory without eviction grows forever. Implement cleanup:
from datetime import datetime, timedelta
from enum import Enum
class EvictionStrategy(Enum):
TIME_BASED = "time" # Delete after N days
COUNT_BASED = "count" # Keep only last N
IMPORTANCE = "importance" # Keep high-importance, delete low
HYBRID = "hybrid" # Combination
class MemoryEvictionManager:
"""Manage memory lifecycle with configurable eviction."""
def __init__(
self,
strategy: EvictionStrategy = EvictionStrategy.HYBRID,
max_age_days: int = 90,
max_count: int = 1000,
importance_threshold: float = 0.3
):
self.strategy = strategy
self.max_age_days = max_age_days
self.max_count = max_count
self.importance_threshold = importance_threshold
def should_evict(
self,
memory: Dict,
current_count: int
) -> bool:
"""Determine if a memory should be evicted."""
if self.strategy == EvictionStrategy.TIME_BASED:
age = datetime.utcnow() - datetime.fromisoformat(memory["timestamp"])
return age.days > self.max_age_days
elif self.strategy == EvictionStrategy.COUNT_BASED:
return current_count > self.max_count
elif self.strategy == EvictionStrategy.IMPORTANCE:
return memory.get("importance", 0) < self.importance_threshold
else: # HYBRID
age = datetime.utcnow() - datetime.fromisoformat(memory["timestamp"])
importance = memory.get("importance", 0.5)
# Old + unimportant = evict
if age.days > self.max_age_days and importance < self.importance_threshold:
return True
# Very old = evict regardless (unless marked critical)
if age.days > self.max_age_days * 2 and not memory.get("critical"):
return True
# Count exceeded and not important
if current_count > self.max_count and importance < 0.7:
return True
return False
def calculate_importance(self, memory: Dict, user_interactions: List) -> float:
"""
Calculate memory importance based on:
- Recency of access
- Frequency of relevance
- Explicit user markers
"""
base_importance = 0.5
# Recently accessed memories are more important
if "last_accessed" in memory:
days_since_access = (datetime.utcnow() -
datetime.fromisoformat(memory["last_accessed"])).days
if days_since_access < 7:
base_importance += 0.2
elif days_since_access < 30:
base_importance += 0.1
# Frequently retrieved memories are important
access_count = memory.get("access_count", 0)
if access_count > 10:
base_importance += 0.2
elif access_count > 5:
base_importance += 0.1
# Facts are more important than interactions
if memory.get("type") == "fact":
base_importance += 0.1
return min(base_importance, 1.0)Summarization memory adds LLM calls. Calculate the cost:
def estimate_memory_costs(
messages_per_day: int,
avg_message_length: int,
summarization_frequency: int, # Summarize every N messages
model: str = "gpt-4o-mini"
) -> dict:
"""
Estimate monthly costs for memory summarization.
Returns breakdown of token usage and costs.
"""
# Token estimates
input_tokens_per_message = avg_message_length // 4 # Rough estimate
summary_input_tokens = summarization_frequency * input_tokens_per_message
summary_output_tokens = 200 # Typical summary length
# Daily summarization calls
summarizations_per_day = messages_per_day // summarization_frequency
# Monthly totals
monthly_summarizations = summarizations_per_day * 30
monthly_input_tokens = monthly_summarizations * summary_input_tokens
monthly_output_tokens = monthly_summarizations * summary_output_tokens
# Pricing (as of late 2025)
pricing = {
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
"gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
}
model_pricing = pricing.get(model, pricing["gpt-4o-mini"])
monthly_cost = (
monthly_input_tokens * model_pricing["input"] +
monthly_output_tokens * model_pricing["output"]
)
return {
"monthly_summarizations": monthly_summarizations,
"monthly_input_tokens": monthly_input_tokens,
"monthly_output_tokens": monthly_output_tokens,
"monthly_cost_usd": round(monthly_cost, 2),
"cost_per_user_usd": round(monthly_cost, 4), # Per user if this is per-user
}
# Example: 100 messages/day, summarize every 10
costs = estimate_memory_costs(
messages_per_day=100,
avg_message_length=200,
summarization_frequency=10,
model="gpt-4o-mini"
)
print(f"Monthly memory cost: ${costs['monthly_cost_usd']}")
# Output: Monthly memory cost: $0.23 (very affordable with mini model)Problem: Buffer memory grows unbounded, eventually exceeding context limits.
# BAD: Unbounded buffer
memory = ConversationBufferMemory()
# After 100 turns, this contains 200+ messages
# GOOD: Bounded with fallback
class BoundedMemory:
def __init__(self, max_messages: int = 20, summarize_after: int = 10):
self.buffer = []
self.max_messages = max_messages
self.summarize_after = summarize_after
self.summary = ""
def add(self, message: dict):
self.buffer.append(message)
if len(self.buffer) > self.max_messages:
# Summarize old messages before dropping
old_messages = self.buffer[:self.summarize_after]
self.summary = self._summarize(old_messages, self.summary)
self.buffer = self.buffer[self.summarize_after:]
def get_context(self) -> str:
context = ""
if self.summary:
context += f"Previous conversation summary: {self.summary}\n\n"
context += "Recent messages:\n"
for msg in self.buffer:
context += f"{msg['role']}: {msg['content']}\n"
return contextProblem: If the AI hallucinates a fact and it gets stored in memory, that hallucination persists and may be reinforced.
# Example of the problem:
# Turn 1: User asks about pricing
# Turn 1: AI hallucinates "Enterprise plan is $999/month" (wrong)
# Memory stores this hallucination
# Turn 10: User asks about enterprise pricing
# Memory retrieves the hallucinated fact
# AI confidently repeats the wrong price
# Solution: Validate before storing
def store_with_validation(
content: str,
source: str,
memory_store,
knowledge_base
):
"""Only store facts that can be verified against knowledge base."""
if source == "user_statement":
# User statements are stored as-is (they know their own situation)
memory_store.store(content, verified=True)
elif source == "ai_response":
# AI responses should be verified against knowledge base
verification = knowledge_base.verify(content)
if verification.is_factual:
memory_store.store(content, verified=True)
elif verification.is_subjective:
# Opinions and recommendations can be stored
memory_store.store(content, verified=False, type="recommendation")
else:
# Don't store potentially hallucinated facts
passProblem: Memory + retrieved documents + system prompt exceeds context window.
def assemble_context_safely(
system_prompt: str,
memories: List[str],
documents: List[str],
chat_history: List[dict],
user_query: str,
max_tokens: int = 8000,
model: str = "gpt-4o"
) -> dict:
"""
Assemble context while respecting token limits.
Priority order:
1. System prompt (always included)
2. User query (always included)
3. Most relevant documents
4. Most recent chat history
5. Most relevant memories
"""
import tiktoken
encoding = tiktoken.encoding_for_model(model)
def count(text: str) -> int:
return len(encoding.encode(text))
# Reserved tokens
reserved = count(system_prompt) + count(user_query) + 500 # Buffer for response
available = max_tokens - reserved
# Allocate budget (adjustable)
doc_budget = int(available * 0.5)
history_budget = int(available * 0.3)
memory_budget = int(available * 0.2)
# Fill documents up to budget
included_docs = []
doc_tokens = 0
for doc in documents:
tokens = count(doc)
if doc_tokens + tokens <= doc_budget:
included_docs.append(doc)
doc_tokens += tokens
else:
break
# Fill history up to budget
included_history = []
history_tokens = 0
for msg in reversed(chat_history): # Most recent first
tokens = count(f"{msg['role']}: {msg['content']}")
if history_tokens + tokens <= history_budget:
included_history.insert(0, msg)
history_tokens += tokens
else:
break
# Fill memories up to budget
included_memories = []
memory_tokens = 0
for memory in memories:
tokens = count(memory)
if memory_tokens + tokens <= memory_budget:
included_memories.append(memory)
memory_tokens += tokens
else:
break
return {
"system_prompt": system_prompt,
"documents": included_docs,
"chat_history": included_history,
"memories": included_memories,
"user_query": user_query,
"total_tokens": reserved + doc_tokens + history_tokens + memory_tokens
}Problem: Long-term memory creates privacy obligations.
class PrivacyAwareMemory:
"""Memory system with privacy controls."""
def __init__(self, memory_store, consent_store):
self.memory = memory_store
self.consent = consent_store
def store(self, user_id: str, content: str, metadata: dict):
"""Store only if user has consented to memory."""
consent = self.consent.get_user_consent(user_id)
if not consent.memory_enabled:
return # User has opted out of memory
# Check for sensitive content
if self._contains_pii(content):
if not consent.pii_storage_allowed:
content = self._redact_pii(content)
self.memory.store(user_id, content, metadata)
def get_user_data(self, user_id: str) -> dict:
"""GDPR/CCPA: Export all user data."""
return {
"memories": self.memory.get_all(user_id),
"consent_settings": self.consent.get_user_consent(user_id),
"export_date": datetime.utcnow().isoformat()
}
def delete_user_data(self, user_id: str):
"""GDPR/CCPA: Right to be forgotten."""
self.memory.delete_all(user_id)
self.consent.delete(user_id)
def _contains_pii(self, text: str) -> bool:
"""Detect personally identifiable information."""
import re
patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{16}\b', # Credit card
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # Email
r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b', # Phone
]
return any(re.search(pattern, text) for pattern in patterns)
def _redact_pii(self, text: str) -> str:
"""Redact detected PII from text."""
# Implementation would replace PII with [REDACTED]
passProblem: Memory contains outdated information that conflicts with current facts.
# User: "My email is old@example.com"
# [Memory stores: email = old@example.com]
# ... months later ...
# User: "My email is new@example.com"
# [Memory now has conflicting facts]
class TemporalMemory:
"""Memory that tracks when facts were learned and handles updates."""
def store_fact(self, user_id: str, key: str, value: str):
"""Store a fact with timestamp, handling updates."""
existing = self.get_fact(user_id, key)
if existing and existing["value"] != value:
# Fact has changed - archive old, store new
self.archive_fact(user_id, key, existing)
self.memory.store({
"user_id": user_id,
"key": key,
"value": value,
"learned_at": datetime.utcnow().isoformat(),
"version": (existing["version"] + 1) if existing else 1
})
def get_fact(self, user_id: str, key: str) -> dict:
"""Get the current version of a fact."""
facts = self.memory.query(
user_id=user_id,
key=key,
order_by="version DESC",
limit=1
)
return facts[0] if facts else None
def get_fact_history(self, user_id: str, key: str) -> List[dict]:
"""Get full history of a fact's changes."""
return self.memory.query(
user_id=user_id,
key=key,
order_by="version ASC"
)Bringing it all together:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Pinecone
from langchain_core.prompts import ChatPromptTemplate
from typing import List, Dict, Optional
import redis
import json
class ProductionConversationalRAG:
"""
Production-ready conversational RAG with:
- Session memory (Redis)
- Long-term user memory (Pinecone)
- Document retrieval (Pinecone)
- Query rewriting
- Privacy controls
"""
def __init__(
self,
redis_url: str,
pinecone_index: str,
openai_api_key: str
):
# Session memory (fast, ephemeral)
self.redis = redis.from_url(redis_url)
# Embeddings
self.embeddings = OpenAIEmbeddings(
api_key=openai_api_key,
model="text-embedding-3-small"
)
# Document store
self.doc_store = Pinecone.from_existing_index(
index_name=pinecone_index,
embedding=self.embeddings,
namespace="documents"
)
# User memory store
self.memory_store = Pinecone.from_existing_index(
index_name=pinecone_index,
embedding=self.embeddings,
namespace="user_memories"
)
# LLMs
self.llm = ChatOpenAI(model="gpt-4o", temperature=0, api_key=openai_api_key)
self.fast_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, api_key=openai_api_key)
def _get_session_key(self, session_id: str) -> str:
return f"rag:session:{session_id}"
def _get_session_history(self, session_id: str, limit: int = 10) -> List[Dict]:
"""Get recent session messages from Redis."""
key = self._get_session_key(session_id)
messages = self.redis.lrange(key, -limit, -1)
return [json.loads(m) for m in messages]
def _save_to_session(self, session_id: str, role: str, content: str):
"""Save message to session memory."""
key = self._get_session_key(session_id)
self.redis.rpush(key, json.dumps({"role": role, "content": content}))
self.redis.expire(key, 86400) # 24 hour TTL
def _get_user_memories(self, user_id: str, query: str, k: int = 5) -> List[str]:
"""Retrieve relevant memories for a user."""
results = self.memory_store.similarity_search(
query,
k=k,
filter={"user_id": user_id}
)
return [doc.page_content for doc in results]
def _save_user_memory(self, user_id: str, content: str, memory_type: str = "interaction"):
"""Store a memory for a user."""
self.memory_store.add_texts(
texts=[content],
metadatas=[{
"user_id": user_id,
"type": memory_type,
"timestamp": datetime.utcnow().isoformat()
}]
)
def _rewrite_query(self, query: str, history: List[Dict]) -> str:
"""Rewrite query to be standalone."""
if not history:
return query
prompt = ChatPromptTemplate.from_template("""
Rewrite this follow-up question as a standalone question.
Chat history:
{history}
Follow-up: {query}
Standalone question:""")
history_str = "\n".join(f"{m['role']}: {m['content']}" for m in history[-4:])
chain = prompt | self.fast_llm
result = chain.invoke({"history": history_str, "query": query})
return result.content.strip()
def _retrieve_documents(self, query: str, k: int = 4) -> List[str]:
"""Retrieve relevant documents."""
results = self.doc_store.similarity_search(query, k=k)
return [doc.page_content for doc in results]
def query(
self,
session_id: str,
user_id: str,
query: str,
include_memory: bool = True
) -> Dict:
"""
Execute a conversational RAG query.
Args:
session_id: Current session identifier
user_id: User identifier for long-term memory
query: User's question
include_memory: Whether to include long-term memory
Returns:
Dict with answer, sources, and debug info
"""
# 1. Get session history
history = self._get_session_history(session_id)
# 2. Rewrite query if needed
enhanced_query = self._rewrite_query(query, history)
# 3. Retrieve documents
documents = self._retrieve_documents(enhanced_query)
# 4. Retrieve memories (if enabled)
memories = []
if include_memory and user_id:
memories = self._get_user_memories(user_id, enhanced_query)
# 5. Build context
doc_context = "\n\n---\n\n".join(documents)
memory_context = "\n".join(f"- {m}" for m in memories) if memories else "None"
# 6. Generate response
prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant with access to documentation and memory.
DOCUMENTATION:
{documents}
USER MEMORIES:
{memories}
Answer based on the documentation. Use memories to personalize when relevant.
If unsure, say so. Cite sources when possible."""),
*[
(msg["role"], msg["content"])
for msg in history[-6:] # Include recent history
],
("human", "{query}")
])
chain = prompt | self.llm
response = chain.invoke({
"documents": doc_context,
"memories": memory_context,
"query": query
})
answer = response.content
# 7. Save to session
self._save_to_session(session_id, "user", query)
self._save_to_session(session_id, "assistant", answer)
# 8. Optionally extract and save facts to long-term memory
if include_memory and user_id:
# Fire-and-forget fact extraction (could be async)
self._extract_and_save_facts(user_id, query, answer)
return {
"answer": answer,
"enhanced_query": enhanced_query,
"documents_used": len(documents),
"memories_used": len(memories),
"session_history_length": len(history)
}
def _extract_and_save_facts(self, user_id: str, query: str, answer: str):
"""Extract memorable facts from the interaction."""
# Simplified - in production, make this async
prompt = ChatPromptTemplate.from_template("""
Extract any facts worth remembering about the user from this exchange.
Return as JSON list of strings, or empty list if nothing notable.
User: {query}
Assistant: {answer}
Facts (JSON list):""")
chain = prompt | self.fast_llm
try:
result = chain.invoke({"query": query, "answer": answer})
facts = json.loads(result.content)
for fact in facts:
self._save_user_memory(user_id, fact, "fact")
except:
pass # Don't fail the request if extraction fails
# Usage
rag = ProductionConversationalRAG(
redis_url="redis://localhost:6379",
pinecone_index="my-rag-index",
openai_api_key="sk-..."
)
# First query
result = rag.query(
session_id="session_abc",
user_id="user_123",
query="What is the vacation policy?"
)
print(result["answer"])
# Follow-up (uses session history)
result = rag.query(
session_id="session_abc",
user_id="user_123",
query="Can I carry days over?"
)
print(result["answer"])
# Later session (uses long-term memory)
result = rag.query(
session_id="session_xyz", # New session
user_id="user_123", # Same user
query="What was that vacation thing I asked about?"
)
print(result["answer"])Memory transforms RAG from a search tool into an intelligent assistant. Here is what we covered:
| Pattern | Use When | Watch Out For |
|---|---|---|
| Buffer | Short conversations (<10 turns) | Unbounded growth |
| Window | Recent context is most important | Hard cutoff loses context |
| Summary | Long conversations, gist matters | Summarization cost and latency |
| Summary Buffer | Need both recency and history | Configuration complexity |
| System | Strengths | Best For |
|---|---|---|
| Zep | Knowledge graphs, auto-extraction | Sophisticated relationship tracking |
| MemGPT | Agent-controlled memory, tiered storage | Autonomous long-running agents |
| LangMem | LangGraph integration, custom logic | Complex stateful workflows |
| Custom | Full control, specific requirements | Unique architectures |
In Part 9, we will explore Multimodal RAG: handling images, audio, video, and mixed-media documents. When your knowledge base includes more than just text, retrieval gets interesting.
Memory and multimodal together create truly capable AI assistants. But first, you need to handle the text case well. The patterns in this article are your foundation.
This article is Part 8 of our "Building RAG Systems" series. Start with Part 1: RAG Foundations if you are new to RAG, or continue to Part 9: Multimodal RAG to learn about handling non-text content.
Discover more content: