GitHub user coderplay closed a discussion: Flink-Agents Memory Design
## Introduction
Memory is fundamental to human intelligence—it shapes identity, guides
decisions, and enables learning, adaptation, and meaningful relationships. In
communication, memory allows us to recall past interactions, infer preferences,
and maintain coherent, context-rich exchanges over long periods. In contrast,
current AI agents powered by large language models (LLMs) are limited by fixed
context windows and lack persistent memory, leading to forgetfulness,
contradictions, and a diminished user experience. Even as LLMs’ context windows
grow, they cannot match the human ability to retain and retrieve relevant
information across sessions and topics. This limitation is especially
problematic in domains requiring continuity and trust, such as healthcare and
education. To overcome these challenges, AI agents need robust memory systems
that can selectively store, consolidate, and retrieve important
information—mirroring human cognition. Such systems will enable AI agents to
maintain consis
tent personas, track evolving user preferences, and build upon prior
exchanges, transforming them into reliable, long-term collaborators.
## Proposal
To address the critical limitations of current AI agents—lack of persistent
memory, poor long-term relationship building—we propose a distributed memory
system for flink-agents that mirrors human cognitive processes.
The solution implements three memory types: **Sensory Memory** (internal
events), **Short-term Memory** (recent history), and **Long-term Memory**
(semantic retrieval). It leverages Flink's distributed state management with a
hybrid backend combining history store and vector store, directly addressing
LLM context window limitations through persistent, searchable memory.
Additionally, **Knowledge** provides shared long-term memory accessible across
all agent instances, stored externally without Flink checkpointing, enabling
domain-specific expertise and consistent collaboration in trust-critical
domains.
## Memory
### High-Level Architecture
The Flink-Agents Memory System introduces a **hybrid state backend** that
combines two specialized state backends:
- **History Store**: Tracks all memory operations for audit trails, and
handles retrieval for short-term memories. The default implementation of the
history store uses Flink’s build-in state backends.
- **Vector Store**: Provides vector search capabilities for long-term semantic
memory retrievals. The default implementation is based on embedded Lucene
instances.
This hybrid approach optimizes both storage efficiency and search performance,
unlike traditional single-backend solutions.
```mermaid
graph LR
subgraph "Flink Cluster"
subgraph "Task Manager 1"
A1[Agent Instance 1]
M1[Memory 1]
subgraph "Hybrid State Backend 1"
S1_R[History Store]
S1_L[Vector Store]
end
end
subgraph "Task Manager 2"
A2[Agent Instance 2]
M2[Memory 2]
subgraph "Hybrid State Backend 2"
S2_R[History Store]
S2_L[Vector Store]
end
end
subgraph "Task Manager N"
AN[Agent Instance N]
MN[Memory N]
subgraph "Hybrid State Backend N"
SN_R[History Store]
SN_L[Vector Store]
end
end
end
A1 --> M1
A2 --> M2
AN --> MN
M1 --> S1_R
M1 --> S1_L
M2 --> S2_R
M2 --> S2_L
MN --> SN_R
MN --> SN_L
style S1_R fill:#ffebee
style S1_L fill:#e8f5e8
style S2_R fill:#ffebee
style S2_L fill:#e8f5e8
style SN_R fill:#ffebee
style SN_L fill:#e8f5e8
```
### Memory Types with Hybrid Storage
The system supports different types of memories, each with specialized storage
in our **hybrid state backend**. **Sensory memory operates internally and is
not exposed to users**:
```mermaid
graph TD
subgraph "Memory Types"
SM["Sensory Memory<br/>Agent Events<br/>MapState<br/>Internal Only"]
STM["Short-term Memory<br/>Recent History<br/>History
Storage<br/>get_history(n)"]
LTM["Long-term Memory<br/>Semantic Search<br/>Vector
Storage<br/>search(query)"]
end
subgraph "Hybrid State Backend"
HB_R["History Store"]
HB_L["Vector Store"]
end
STM -.-> HB_R
LTM -.-> HB_L
style SM fill:#e1f5fe
style STM fill:#f3e5f5
style LTM fill:#e8f5e8
style HB_R fill:#ffebee
style HB_L fill:#e8f5e8
```
### Memory Types and Implementation
The system supports three distinct types of memories, each serving different
purposes and using different storage mechanisms:
#### Sensory Memory (Event Processing)
Sensory memory captures real-time events from agents and stores them in Flink's
keyed state using `MapState`. This represents the immediate sensory input that
agents receive from their environment. **Sensory memory is completely invisible
to users** and operates automatically in the background.
**Characteristics:**
- **Storage**: Flink MapState (in-memory + checkpointed)
- **Retention**: Configurable window size
- **Access Pattern**: Lookup
- **Performance**: Nano-seconds latency
- **Visibility**: Internal only - no user API access
#### Short-term Memory (Recent History)
Short-term memory maintains a configurable history of recent agent interactions
and experiences. It uses history storage for persistence and provides fast
access to recent memories.
**Characteristics:**
- **Storage**: History storage (local to each TaskManager)
- **Retention**: Persistent, configurable TTL
- **Access Pattern**: Sequential, indexed by time
- **Performance**: Sub-millisecond latency
#### Long-term Memory (Semantic Search)
Long-term memory provides semantic search capabilities using a vector storage
backend. It stores memories with embeddings for similarity-based retrieval.
**Characteristics:**
- **Storage**: Vector storage backend
- **Retention**: Persistent, configurable TTL
- **Access Pattern**: Semantic search, similarity-based
- **Performance**: Optimized for search operations
### Memory API Design
The Memory API provides a unified interface for different types of memories.
**Sensory memory is not exposed through the API** and operates automatically in
the background.
Memory inherits from Flink's State API. It behaves like other Flink KeyedState
implementations and can be saved to StateBackends and checkpointed for fault
tolerance and recovery.
```python
@dataclass
class MemoryItem(BaseModel):
id: str = Field(..., description="The unique identifier for the text data")
memory: str = Field(
..., description="The memory deduced from the text data"
)
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional
metadata for the text data")
created_at: Optional[str] = Field(None, description="The timestamp when the
memory was created")
updated_at: Optional[str] = Field(None, description="The timestamp when the
memory was updated")
class MemoryAPI(ABC):
"""
Core memory interface for Flink agents.
This API provides access to different types of memories:
- Short-term memory: Recent history via get_history(n) method
- Long-term memory: Semantic search via search() method
Note: Sensory memory is internal and not exposed through this API.
"""
@abstractmethod
def add(self, memory: MemoryItem) -> str:
"""Store a memory item"""
pass
@abstractmethod
def update(self, memory_id: str, data: MemoryItem) -> MemoryItem:
"""Update a memory by ID"""
pass
@abstractmethod
def delete(self, memory_id: str) -> None:
"""Delete a memory by ID"""
pass
@abstractmethod
def get(self, memory_id: str) -> Optional[MemoryItem]:
"""Retrieve a memory by ID"""
pass
@abstractmethod
def get_history(self, n: int = 10) -> List[MemoryItem]:
"""Get last n memories"""
pass
@abstractmethod
def search(self, query: str, limit: int = 10) -> List[MemoryItem]:
"""Search memories by semantic similarity"""
pass
```
## Knowledge
Knowledge is a special type of long-term memory that is shared across all Agent
instances. It provides access to external, pre-built knowledge sources through
remote vector database connections, offering domain-specific knowledge that can
enhance agent responses. Unlike individual agent memories, knowledge is
globally accessible and persistent across the entire Flink cluster. Since
knowledge is independent from a particular agent,
it does not require Flink state checkpointing.
### Knowledge Architecture
```mermaid
graph TB
subgraph "Flink Cluster"
subgraph "Agent Instances"
A1[Agent 1]
A2[Agent 2]
AN[Agent N]
end
end
subgraph "External Knowledge Store"
KB1[Knowledge Base 1<br/>Domain A]
KB2[Knowledge Base 2<br/>Domain B]
KB3[Knowledge Base N<br/>Domain N]
end
A1 -.-> KB1
A1 -.-> KB2
A1 -.-> KB3
A2 -.-> KB1
A2 -.-> KB2
A2 -.-> KB3
AN -.-> KB1
AN -.-> KB2
AN -.-> KB3
style KB1 fill:#fff3e0
style KB2 fill:#fff3e0
style KB3 fill:#fff3e0
```
### Knowledge Base Implementation
```python
class KnowledgeBase:
"""
Shared knowledge base integration.
Provides access to pre-built knowledge sources through remote
vector database connections. This is a special type of long-term
memory that is shared across all agent instances in the cluster.
"""
def __init__(self, service_url: str, api_key: str = None):
self.service_url = service_url
self.api_key = api_key
self.client = VectorDBClient(service_url, api_key)
def search(self, query: str, knowledge_source: str = None, limit: int = 10)
-> List[KnowledgeItem]:
"""
Search external knowledge base.
Args:
query: Search query string
knowledge_source: Specific knowledge source to search (optional)
limit: Maximum number of results to return
Returns:
List of knowledge items with relevance scores
"""
response = self.client.search(
query=query,
source=knowledge_source,
limit=limit
)
return [KnowledgeItem.from_response(item) for item in response.results]
def get_sources(self) -> List[str]:
"""Get available knowledge sources"""
return self.client.list_sources()
def get_source_info(self, source_name: str) -> Dict[str, Any]:
"""Get information about a specific knowledge source"""
return self.client.get_source_info(source_name)
class KnowledgeItem:
"""Represents a knowledge item from external sources"""
def __init__(self, content: str, source: str, score: float, metadata:
Dict[str, Any] = None):
self.content = content
self.source = source
self.score = score
self.metadata = metadata or {}
```
## Memory Compaction
Memory compaction addresses the fundamental limitation of Large Language Models
(LLMs): their fixed context window constraints. While the Memory API can store
vast amounts of memories limited only by disk capacity and storage limits, LLMs
have context window limitations ranging from several thousand to several
million tokens.
### Compaction Strategies
Memory compaction reduces memory volume while preserving essential information
through two main approaches:
#### 1. Scoring-Based Compaction
Scoring-based compaction evaluates the relevance and importance of each memory
item using mechanisms such as recency, frequency of access, semantic similarity
to the current query, and user-defined importance markers. By ranking memories
based on these criteria, the system retains only the most significant ones.
#### 2. Summarization-Based Compaction
Summarization-based compaction uses LLMs to create intelligent summaries of
memory groups. This approach groups similar memories together and generates
concise summaries that capture essential information from multiple related
memories.
### Integration
The compaction functionality is integrated into the Agent API to provide
seamless compaction capabilities. This integration ensures that agents can
efficiently access relevant memory information without exceeding context window
limitations, resulting in optimized context window usage, faster LLM inference,
and reduced token costs.
## Data Flow
### Memory Addition Flow
```mermaid
sequenceDiagram
participant Agent
participant Memory
participant LLM
participant Embeddings
participant VectorStore
participant HistoryStore
participant MemoryCompactor
Agent->>Memory: add(messages, agent_id)
Memory->>LLM: extract_facts(messages)
LLM-->>Memory: extracted_facts[]
loop For each fact
Memory->>Embeddings: embed(fact)
Embeddings-->>Memory: vector
Memory->>VectorStore: search_similar(vector)
VectorStore-->>Memory: existing_memories[]
end
Memory->>LLM: update_memory_decisions(facts, existing)
LLM-->>Memory: actions[ADD/UPDATE/DELETE]
loop For each action
alt ADD
Memory->>VectorStore: insert(vector, metadata)
Memory->>HistoryStore: add_history(memory_id, null, new_memory,
"ADD")
else UPDATE
Memory->>VectorStore: update(memory_id, vector, metadata)
Memory->>HistoryStore: add_history(memory_id, old_memory,
new_memory, "UPDATE")
else DELETE
Memory->>VectorStore: delete(memory_id)
Memory->>HistoryStore: add_history(memory_id, old_memory, null,
"DELETE")
end
end
Memory-->>Agent: memory_results[]
alt triggers_compaction
Agent->>MemoryCompactor: compact()
MemoryCompactor-->>Agent: compaction_complete
end
```
### Memory Search Flow
```mermaid
sequenceDiagram
participant Agent
participant Memory
participant Embeddings
participant VectorStore
Agent->>Memory: search(query)
Memory->>Embeddings: embed(query)
Embeddings-->>Memory: query_vector
Memory->>VectorStore: search(query_vector, filters)
VectorStore-->>Memory: similar_memories[]
Memory-->>Agent: search_results[]
```
## Integration in Agents
```python
class AgentWithMemory(Agent):
"""Flink agent with shared memory and knowledge integration"""
def __init__(self, agent_id: str, knowledge_bases: List[KnowledgeBase] =
None):
super().__init__(agent_id)
self.knowledge_bases = knowledge_bases or []
def search_knowledge(self, query: str, source: str = None, limit: int = 10)
-> List[KnowledgeItem]:
"""
Search across all available shared knowledge bases.
Args:
query: Search query
source: Specific knowledge source (optional)
limit: Maximum results per source
Returns:
Combined results from all shared knowledge bases
"""
all_results = []
for kb in self.knowledge_bases:
if source is None or source == kb.get_source_info().get('name'):
results = kb.search(query, limit=limit)
all_results.extend(results)
# Sort by relevance score
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:limit]
def process_event(self, event: AgentEvent):
"""Process event with memory and knowledge integration"""
# Get memory context
recent_history = self.memory.get_history(5)
relevant_memories = self.memory.search(event.content, limit=10)
compacted_context = self.memory_compactor.get()
# Get knowledge context
knowledge_context = self.search_knowledge(event.content, limit=5)
# Generate response with compacted context
response = self.generate_response(
event,
recent_history,
relevant_memories,
compacted_context,
knowledge_context
)
# Store interaction
interaction_memory = MemoryItem(
id=f"interaction_{time.time()}",
memory=f"Event: {event.content}, Response: {response}",
metadata={
"timestamp": time.time(),
"agent_id": self.agent_id,
},
created_at=str(time.time())
)
self.memory.store(interaction_memory)
if(triggers_compaction):
this.memory_compactor.compact()
return response
```
## Execution Plan
The implementation will be executed in three phases:
### Phase 1: Core Memory Foundation
- **Short-term Memory**: History-based storage using MapState/ListState
- **Memory Compaction**: Scoring and summarization strategies
- **Knowledge Integration**: External knowledge base connections
- **Architecture**: Memory delegates to MapState/ListState (no Flink State
inheritance)
- **Long-term Memory**: Replaced by compacted memory results
### Phase 2: Flink State Integration
- **Memory State Inheritance**: Memory inherits from Flink's State API
- **State Backend Integration**: Full integration with Flink state backends
- **Checkpointing Support**: Fault tolerance through Flink checkpointing
### Phase 3: Hybrid State Backend
- **Hybrid State Backend**: History Store and Vector Store.
## Conclusion
The Flink-Agents Memory System introduces a **revolutionary hybrid state
backend** that combines RocksDB/ForSt and Lucene instances within a unified
state management framework. This dual-engine approach eliminates the need for
external vector databases while providing optimal performance for both
historical data storage and semantic search operations.
### Key Innovations
1. **Hybrid State Backend**: The core innovation combining RocksDB/ForSt
instances for historical data and Lucene instances for vector search
2. **Unified Interface**: Single API that automatically routes operations to
the appropriate storage engine
3. **Distributed Architecture**: Each TaskManager hosts its own hybrid backend
with isolated RocksDB/ForSt and Lucene instances
4. **Coordinated Checkpointing**: Ensures consistent state recovery across both
storage engines
This design provides a solid foundation for building sophisticated,
memory-aware agents in Apache Flink environments, with the hybrid state backend
serving as the cornerstone of the architecture.
GitHub link: https://github.com/apache/flink-agents/discussions/92
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]