GitHub user alnzng created a discussion: Vector Store Integration for Flink
Agents
## Overview
Vector stores enable agents to perform semantic search and knowledge retrieval
over large document collections. This unlocks powerful new agent patterns
including **RAG (Retrieval-Augmented Generation)** where agents search relevant
documents before generating responses, **knowledge base agents** that answer
questions from company documentation, and **context-aware conversations** that
maintain relevant context across long interactions. Vector stores will also
serve as a foundation for future long-term memory capabilities.
We propose adding **Vector Store** as a new resource type following the same
architectural pattern as ChatModel, with multiple built-in implementations to
support different use cases and deployment preferences.
## Design Overview
```
User Input → RAG Agent → Vector Search → Context Retrieval → Enhanced
Generation → Response
Detailed Event Flow:
┌─────────────┐ ┌──────────────────┐ ┌─────────────────────┐
┌─────────────────┐ ┌──────────────────┐ ┌─────────────┐
│ InputEvent │───▶│ VectorSearchEvent│───▶│ VectorSearchResult │───▶│
ChatRequestEvent│───▶│ ChatResponseEvent│───▶│ OutputEvent │
│ │ │ │ │ Event │ │
│ │ │ │ │
└─────────────┘ └──────────────────┘ └─────────────────────┘
└─────────────────┘ └──────────────────┘ └─────────────┘
│ ▲
│ ▲
▼ │
▼ │
┌─────────────────┐ │
┌─────────────────┐ │
│ Vector Store │──────────────────┘ │ Chat
Model │────────────────┘
│ Backend │ │
(OpenAI/etc) │
└─────────────────┘
└─────────────────┘
│
▼
┌─────────────────┐
│ Embedding Model │
│ (text→vectors) │
└─────────────────┘
```
## APIs
### Base Classes
```python
class BaseVectorStoreConnection(Resource, ABC):
"""Base class for vector store connections.
Manages connection configuration such as endpoints, authentication, and
connection parameters.
"""
@abstractmethod
def search(self, query: str, k: int, **kwargs) -> List[VectorSearchResult]:
"""Perform semantic search."""
pass
class BaseVectorStoreSetup(Resource, ABC):
"""Base class for vector store setup.
Similar to BaseChatModelSetup, configures the vector store with
specific collections, embedding models, and search parameters.
Attributes:
----------
connection_name : str
Name of the connection resource to use
embedding_model : str
Name/identifier of the embedding model to use
"""
connection_name: str
embedding_model: str
```
### Event Types
New event types for vector operations:
```python
class VectorSearchEvent(Event):
"""Event to request semantic search from vector store.
Attributes:
----------
vector_store : str
Name of the vector store resource to use
query : str
Search query text
k : int
Number of results to return (default: 5)
"""
vector_store: str
query: str
k: int = 5
class VectorSearchResultEvent(Event):
"""Event containing vector search results.
Attributes:
----------
request : VectorSearchEvent
The original search request
results : List[VectorSearchResult]
Search results with documents and scores
"""
request: VectorSearchEvent
results: List[VectorSearchResult]
```
### RAG Agent Example
```python
class RAGAgent(Agent):
@vector_store_connection
@staticmethod
def elasticsearch_connection() -> Tuple[Type[BaseVectorStoreConnection],
Dict[str, Any]]:
"""Elasticsearch connection for vector store."""
return ElasticsearchConnection, {
"name": "es_conn",
"hosts": ["http://localhost:9200"]
}
@vector_store
@staticmethod
def knowledge_base() -> Tuple[Type[BaseVectorStoreSetup], Dict[str, Any]]:
"""Knowledge base vector store using Elasticsearch."""
return ElasticsearchVectorStore, {
"name": "knowledge_base",
"connection": "es_conn",
"embedding_model": "<your_embedding_model_here>"
}
@chat_model_server
@staticmethod
def openai_connection() -> Tuple[Type[BaseChatModelConnection], Dict[str,
Any]]:
"""OpenAI connection for chat model."""
return OpenAIChatModelConnection, {
"name": "openai_conn",
"api_key": "<your_openai_api_key_here>"
}
@chat_model
@staticmethod
def rag_chat_model() -> Tuple[Type[BaseChatModelSetup], Dict[str, Any]]:
"""Chat model for RAG responses."""
return OpenAIChatModelSetup, {
"name": "rag_model",
"connection": "openai_conn",
"model": "<your_chat_model_here>"
}
@action(InputEvent)
@staticmethod
def search_knowledge(event: InputEvent, ctx: RunnerContext):
# Search for relevant context first
ctx.send_event(VectorSearchEvent(
vector_store="knowledge_base",
query=event.input,
k=3
))
@action(VectorSearchResultEvent)
@staticmethod
def generate_response(event: VectorSearchResultEvent, ctx: RunnerContext):
# Use search results as context for chat model
context = "\\n".join([r.document.content for r in event.results])
enhanced_prompt = f"Context: {context}\\nQuestion:
{event.request.query}"
ctx.send_event(ChatRequestEvent(
model="rag_model",
messages=[ChatMessage(content=enhanced_prompt)]
))
```
GitHub link: https://github.com/apache/flink-agents/discussions/143
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]