GitHub user alnzng closed a discussion: Vector Store and Embedding Model
Integration for Flink Agents
## Overview
Vector stores and embedding models together enable agents to perform semantic
search and knowledge retrieval over large document collections. This powerful
combination unlocks new agent patterns including **RAG (Retrieval-Augmented
Generation)** where agents search relevant documents before generating
responses, **knowledge base agents** that answer questions from company
documentation, and **context-aware conversations** that maintain relevant
context across long interactions. These capabilities will also serve as a
foundation for future long-term memory features.
We propose adding **Vector Store** and **Embedding Model** as new resource
types following the same architectural pattern as ChatModel, with multiple
built-in implementations to support different use cases and deployment
preferences.
## Design Overview
```
User Input → RAG Agent → Query Embedding → Vector Search → Context Retrieval →
Enhanced Generation → Response
Detailed Event Flow:
┌─────────────┐ ┌──────────────────┐ ┌─────────────────────┐
┌─────────────────┐ ┌──────────────────┐ ┌─────────────┐
│ InputEvent │───▶│ VectorSearchEvent│───▶│ VectorSearchResult │───▶│
ChatRequestEvent│───▶│ ChatResponseEvent│───▶│ OutputEvent │
│ │ │ │ │ Event │ │
│ │ │ │ │
└─────────────┘ └──────────────────┘ └─────────────────────┘
└─────────────────┘ └──────────────────┘ └─────────────┘
│ ▲
│ ▲
▼ │
▼ │
┌─────────────────┐ │
┌─────────────────┐ │
│ Vector Store │──────────────────┘ │ Chat
Model │────────────────┘
│ Backend │ │
(OpenAI/etc) │
└─────────────────┘
└─────────────────┘
│
▼
┌─────────────────┐
│ Embedding Model │
│ (text→vectors) │
└─────────────────┘
```
## APIs
### Base Classes
```python
class BaseEmbeddingModelConnection(Resource, ABC):
"""Base class for embedding model connections.
Similar to BaseChatModelConnection, manages connection configuration
for embedding service providers (OpenAI, Azure, local models, etc.).
"""
@abstractmethod
def embed_query(self, text: str) -> List[float]:
"""Generate embedding for a single query text."""
pass
class BaseEmbeddingModelSetup(Resource, ABC):
"""Base class for embedding model setup.
Similar to BaseChatModelSetup, configures the embedding model with
specific model names, dimensions, and processing parameters.
Attributes:
----------
connection_name : str
Name of the connection resource to use
model_name : str
Name of the specific embedding model (e.g., "text-embedding-3-small")
"""
connection_name: str
model_name: str
class BaseVectorStoreConnection(Resource, ABC):
"""Base class for vector store connections.
Manages connection configuration such as endpoints, authentication, and
connection parameters.
"""
@abstractmethod
def search_with_embedding(self, embedding: List[float], k: int, **kwargs)
-> List[VectorSearchResult]:
"""Perform search using pre-computed embedding vector."""
pass
class BaseVectorStoreSetup(Resource, ABC):
"""Base class for vector store setup.
Similar to BaseChatModelSetup, configures the vector store with
specific collections, embedding models, and search parameters.
Attributes:
----------
connection_name : str
Name of the connection resource to use
embedding_model : str
Name of the embedding model resource to use
"""
connection_name: str
embedding_model: str
```
### Event Types
New event types for vector operations:
```python
class VectorSearchEvent(Event):
"""Event to request semantic search from vector store.
Attributes:
----------
vector_store : str
Name of the vector store resource to use
query : str
Search query text
k : int
Number of results to return (default: 5)
"""
vector_store: str
query: str
k: int = 5
class VectorSearchResultEvent(Event):
"""Event containing vector search results.
Attributes:
----------
request : VectorSearchEvent
The original search request
results : List[VectorSearchResult]
Search results with documents and scores
"""
request: VectorSearchEvent
results: List[VectorSearchResult]
```
### RAG Agent Example
```python
class RAGAgent(Agent):
@embedding_model_connection
@staticmethod
def openai_embedding_connection() ->
Tuple[Type[BaseEmbeddingModelConnection], Dict[str, Any]]:
"""OpenAI connection for embedding models."""
return OpenAIEmbeddingConnection, {
"name": "openai_embed_conn",
"api_key": os.getenv("OPENAI_API_KEY")
}
@embedding_model
@staticmethod
def text_embeddings() -> Tuple[Type[BaseEmbeddingModelSetup], Dict[str,
Any]]:
"""Text embedding model for semantic search."""
return OpenAIEmbeddingModel, {
"name": "text_embeddings",
"connection": "openai_embed_conn",
"model_name": "text-embedding-3-small"
}
@vector_store_connection
@staticmethod
def elasticsearch_connection() -> Tuple[Type[BaseVectorStoreConnection],
Dict[str, Any]]:
"""Elasticsearch connection for vector store."""
return ElasticsearchConnection, {
"name": "es_conn",
"hosts": ["http://localhost:9200"]
}
@vector_store
@staticmethod
def knowledge_base() -> Tuple[Type[BaseVectorStoreSetup], Dict[str, Any]]:
"""Knowledge base vector store using Elasticsearch."""
return ElasticsearchVectorStore, {
"name": "knowledge_base",
"connection": "es_conn",
"index": "documents",
"embedding_model": "text_embeddings" # Reference to embedding
model resource
}
@chat_model_server
@staticmethod
def openai_chat_connection() -> Tuple[Type[BaseChatModelConnection],
Dict[str, Any]]:
"""OpenAI connection for chat model."""
return OpenAIChatModelConnection, {
"name": "openai_chat_conn",
"api_key": os.getenv("OPENAI_API_KEY")
}
@chat_model
@staticmethod
def rag_chat_model() -> Tuple[Type[BaseChatModelSetup], Dict[str, Any]]:
"""Chat model for RAG responses."""
return OpenAIChatModelSetup, {
"name": "rag_model",
"connection": "openai_chat_conn",
"model": "gpt-4o-mini"
}
@action(InputEvent)
@staticmethod
def search_knowledge(event: InputEvent, ctx: RunnerContext):
# Search for relevant context first
ctx.send_event(VectorSearchEvent(
vector_store="knowledge_base",
query=event.input,
k=3
))
@action(VectorSearchResultEvent)
@staticmethod
def generate_response(event: VectorSearchResultEvent, ctx: RunnerContext):
# Use search results as context for chat model
context = "\\n".join([r.document.content for r in event.results])
enhanced_prompt = f"Context: {context}\\nQuestion:
{event.request.query}"
ctx.send_event(ChatRequestEvent(
model="rag_model",
messages=[ChatMessage(content=enhanced_prompt)]
))
```
GitHub link: https://github.com/apache/flink-agents/discussions/143
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]