This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 0562bbe4 [doc] Update document for vector store. (#470)
0562bbe4 is described below

commit 0562bbe4c1d22282a787094dd22353ce59080307
Author: Wenjin Xie <[email protected]>
AuthorDate: Mon Jan 26 15:13:42 2026 +0800

    [doc] Update document for vector store. (#470)
---
 docs/content/docs/development/vector_stores.md | 510 ++++++++++++++++++++++---
 1 file changed, 463 insertions(+), 47 deletions(-)

diff --git a/docs/content/docs/development/vector_stores.md 
b/docs/content/docs/development/vector_stores.md
index 1ef71d7c..a51a925c 100644
--- a/docs/content/docs/development/vector_stores.md
+++ b/docs/content/docs/development/vector_stores.md
@@ -32,41 +32,60 @@ This page covers semantic search using vector stores. 
Additional query modes (ke
 
 Vector stores enable efficient storage, indexing, and retrieval of 
high-dimensional embedding vectors alongside their associated documents. They 
provide the foundation for semantic search capabilities in AI applications by 
allowing fast similarity searches across large document collections.
 
+### Use Case
 In Flink Agents, vector stores are essential for:
 - **Document Retrieval**: Finding relevant documents based on semantic 
similarity
 - **Knowledge Base Search**: Querying large collections of information using 
natural language
 - **Retrieval-Augmented Generation (RAG)**: Providing context to language 
models from vector-indexed knowledge
 - **Semantic Similarity**: Comparing and ranking documents by meaning rather 
than keywords
 
-## Getting Started
+### Concepts
+* **Document**: Document is the abstraction that represents a piece of text 
and associated metadata.
+* **Collection**: Collection is the abstraction that represents a set of 
documents. It corresponds to different concept for different vector store 
specification, like index in Elasticsearch and collection in Chroma.
+
+## How to use
 
 To use vector stores in your agents, you need to configure both a vector store 
and an embedding model, then perform semantic search using structured queries.
 
-### Resource Decorators
+### Declare a vector store in Agent
 
-Flink Agents provides decorators to simplify vector store setup within agents:
+Flink Agents provides decorators/annotations to simplify vector store setup 
within agents:
 
 {{< tabs "Resource Decorators" >}}
 
 {{< tab "Python" >}}
-
-#### @vector_store
-
-The `@vector_store` decorator marks a method that creates a vector store. 
Vector stores automatically integrate with embedding models for text-based 
search.
-
+```python
+@vector_store
+@staticmethod
+def my_vector_store() -> ResourceDescriptor:
+    return ResourceDescriptor(
+        clazz=Constant.CHROMA_VECTOR_STORE,
+        embedding_model="embedding_model",
+        collection="my_chroma_store"
+    )
+```
 {{< /tab >}}
 
 {{< tab "Java" >}}
-
-#### @VectorStore
-
-The `@VectorStore` annotation marks a method that creates a vector store.
-
+```java
+@VectorStore
+public static ResourceDescriptor vectorStore() {
+    return 
ResourceDescriptor.Builder.newBuilder(Constant.ELASTICSEARCH_VECTOR_STORE)
+            .addInitialArgument("embedding_model", "embeddingModel")
+            .addInitialArgument("host", "http://localhost:9200";)
+            .addInitialArgument("index", "my_documents")
+            .addInitialArgument("vector_field", "content_vector")
+            .addInitialArgument("dims", 1536)
+            .build();
+}
+```
 {{< /tab >}}
 
 {{< /tabs >}}
 
-### Query Objects
+### How to query the vector store
+
+#### Query Objects
 
 Vector stores use structured query objects for consistent interfaces:
 
@@ -98,7 +117,7 @@ VectorStoreQuery query = new VectorStoreQuery(
 
 {{< /tabs >}}
 
-### Query Results
+#### Query Results
 
 When you execute a query, you receive a `VectorStoreQueryResult` object that 
contains the search results:
 
@@ -131,6 +150,140 @@ VectorStoreQueryResult result = vectorStore.query(query);
 
 {{< /tabs >}}
 
+### Manage collections
+
+User can dynamic create, get or delete collections in agent execution:
+* `get_or_create_collection`: Get a collection by name, create if not exists. 
User can provide additional metadatas.
+* `get_collection`: Get a collection by name. The collection must be created 
by flink-agents before.
+* `delete_collection`: Delete a collection by name.
+
+{{< hint info >}}
+Collection level operations is only supported for vector store that implements 
`CollectionManageableVectorStore`. Currently, Chroma and Elasticsearch.
+{{< /hint >}}
+
+{{< tabs "Collection level operations" >}}
+
+{{< tab "Python" >}}
+
+```python
+# get the vector store from runner context
+vector_store: CollectionManageableVectorStore = 
ctx.get_resource("vector_store", ResourceType.VECTOR_STORE)
+
+# create a collection
+collection: Collection = vector_store.get_or_create_collection("my_collection" 
, metadata={"key1": "value1", "key2": "value2"})
+# get the collection
+collection: Collection = vector_store.get_collection("my_collection")
+# get the collection metadata
+metadata = collection.metadata
+
+# delete the collection
+vector_store.delete_collection("my_collection)
+```
+
+{{< /tab >}}
+
+{{< tab "Java" >}}
+
+```java
+// get the vector store from runner context
+CollectionManageableVectorStore vectorStore =
+        (CollectionManageableVectorStore)
+                ctx.getResource("vector_store", ResourceType.VECTOR_STORE);
+
+// create a collection
+Collection collection = vectorStore.getOrCreateCollection(
+        "my_collection", Map.of("key1", "value1", "key2", "value2"));
+// get the collection
+collection = vectorStore.getCollection("my_collection");
+// get the collection metadata
+Map<String, Object> metadata = collection.getMetadata();
+
+// delete the collection
+vectorStore.deleteCollection("my_collection");
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+
+### Manage documents
+User can dynamic add, get or delete documents in agent execution:
+* `add`: Add documents to a collection. If document ID is not specified, will 
generate random ID for each document.
+* `get`: Get documents from a collection by IDs. If no IDs are provided, get 
all documents.
+* `delete`: Delete documents from a collection by IDs. If no IDs are provided, 
delete all documents.
+
+{{< hint info >}}
+If collection name is not specified, the document level operations will apply 
to the default collection configured by vector store initialization parameters.
+{{< /hint >}}
+
+{{< tabs "Document level operations" >}}
+
+{{< tab "Python" >}}
+
+```python
+# get the vector store from runner context
+store: CollectionManageableVectorStore = ctx.get_resource("vector_store", 
ResourceType.VECTOR_STORE)
+
+# create or get a collection
+collection: Collection = vector_store.get_or_create_collection("my_collection" 
, metadata={"key1": "value1", "key2": "value2"})
+
+# add documents to the collection
+documents = [Document(id="doc1", content="the first doc", metadata={"key": 
"value1"}), 
+             Document(id="doc2", content="the second doc", metadata={"key": 
"value2"})]
+vector_store.add(documents=documents, collection_name="my_collection")
+
+# get documents by IDs
+doc: List[Document] = vector_store.get(ids="doc2", 
collectioin_name="my_collection")
+# get all documents
+doc: List[Document] = vector_store.get(collectioin_name="my_collection")
+
+# delete documents by IDs
+vector_store.delete(ids=["doc1", "doc2"], collection_name="my_collection")
+# delete all documents
+vector_store.delete(collection_name="my_collection")
+```
+
+{{< /tab >}}
+
+{{< tab "Java" >}}
+
+```java
+// get the vector store from runner context
+BaseVectorStore vectorStore =
+                (BaseVectorStore)
+                        ctx.getResource("vectorStore", 
ResourceType.VECTOR_STORE);
+// create or get a collection
+Collection collection = ((CollectionManageableVectorStore) vectorStore)
+        .getOrCreateCollection("my_collection", Map.of("key1", "value1", 
"key2", "value2"));
+
+// add documents to the collection
+List<Document> documents = List.of(
+                new Document(
+                        "the first doc.",
+                        Map.of("key", "value1"),
+                        "doc1"),
+                new Document(
+                        "the second doc",
+                        Map.of("key", "value2"),
+                        "doc2"));
+vectorStore.add(documents, "my_collection", Collections.emptyMap());
+
+// get documents by IDs
+List<Document> docs = vectorStore.get(List.of("doc1"), "my_collection", 
Collections.emptyMap());
+// get all documents
+docs = vectorStore.get(null, "my_collection", Collections.emptyMap());
+
+// delete documents by IDs
+vectorStore.delete(List.of("doc1", "doc2"), "my_collection", 
Collections.emptyMap());
+// delete all documents
+vectorStore.delete(null, "my_collection", Collections.emptyMap());
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
 ### Usage Example
 
 Here's how to define and use vector stores in your agent:
@@ -396,27 +549,31 @@ Elasticsearch is currently supported in the Java API 
only. To use Elasticsearch
 #### Prerequisites
 
 1. An Elasticsearch cluster (version 8.0 or later for KNN support).
-2. An index with a `dense_vector` field.
 
 #### ElasticsearchVectorStore Parameters
 
-| Parameter | Type | Default | Description |
-|-----------|------|---------|-------------|
-| `embedding_model` | str | Required | Reference to embedding model resource 
name |
-| `index` | str | Required | Target Elasticsearch index name |
-| `vector_field` | str | Required | Name of the dense vector field used for 
KNN |
-| `dims` | int | `768` | Vector dimensionality |
-| `k` | int | None | Number of nearest neighbors to return; can be overridden 
per query |
-| `num_candidates` | int | None | Candidate set size for ANN search; can be 
overridden per query |
-| `filter_query` | str | None | Raw JSON Elasticsearch filter query (DSL) 
applied as a post-filter |
-| `host` | str | `"http://localhost:9200"` | Elasticsearch endpoint |
-| `hosts` | str | None | Comma-separated list of Elasticsearch endpoints |
-| `username` | str | None | Username for basic authentication |
-| `password` | str | None | Password for basic authentication |
-| `api_key_base64` | str | None | Base64-encoded API key for authentication |
-| `api_key_id` | str | None | API key ID for authentication |
-| `api_key_secret` | str | None | API key secret for authentication |
+| Parameter         | Type | Default                   | Description           
                                             |
+|-------------------|------|---------------------------|--------------------------------------------------------------------|
+| `embedding_model` | str  | Required                  | Reference to 
embedding model resource name                         |
+| `index`           | str  | None                      | Default target 
Elasticsearch index name                            |
+| `vector_field`    | str  | `"_vector"`               | Name of the dense 
vector field used for KNN                        |
+| `dims`            | int  | `768`                     | Vector dimensionality 
                                             |
+| `k`               | int  | None                      | Number of nearest 
neighbors to return; can be overridden per query |
+| `num_candidates`  | int  | None                      | Candidate set size 
for ANN search; can be overridden per query     |
+| `filter_query`    | str  | None                      | Raw JSON 
Elasticsearch filter query (DSL) applied as a post-filter |
+| `host`            | str  | `"http://localhost:9200"` | Elasticsearch 
endpoint                                             |
+| `hosts`           | str  | None                      | Comma-separated list 
of Elasticsearch endpoints                    |
+| `username`        | str  | None                      | Username for basic 
authentication                                  |
+| `password`        | str  | None                      | Password for basic 
authentication                                  |
+| `api_key_base64`  | str  | None                      | Base64-encoded API 
key for authentication                          |
+| `api_key_id`      | str  | None                      | API key ID for 
authentication                                      |
+| `api_key_secret`  | str  | None                      | API key secret for 
authentication                                  |
+
+{{< hint warning >}}
+For index not create by flink-agents, the index must have a `dense_tensor` 
field, and user must specify the filed name by `vector_field`.
 
+And, the index can't be accessed by collection level operations due to 
Elasticsearch does not support store index metadata natively.
+{{< /hint >}}
 #### Usage Example
 
 {{< tabs "Elasticsearch Usage Example" >}}
@@ -579,11 +736,11 @@ public class MyAgent extends Agent {
 The custom provider APIs are experimental and unstable, subject to 
incompatible changes in future releases.
 {{< /hint >}}
 
-If you want to use vector stores not offered by the built-in providers, you 
can extend the base vector store class and implement your own! The vector store 
system is built around the `BaseVectorStore` abstract class.
+If you want to use vector stores not offered by the built-in providers, you 
can extend the base vector store class and implement your own! The vector store 
system is built around the `BaseVectorStore` abstract class and 
`CollectionManageableVectorStore` interface.
 
 ### BaseVectorStore
 
-The base class handles text-to-vector conversion and provides the high-level 
query interface. You only need to implement the core vector search 
functionality.
+The base class handles text-to-vector conversion and provides the high-level 
add and query interface. You only need to implement the core search 
functionality and other basic document level operations. 
 
 {{< tabs "Custom Vector Store" >}}
 
@@ -598,14 +755,95 @@ class MyVectorStore(BaseVectorStore):
         # Return vector store-specific configuration
         # These parameters are merged with query-specific parameters
         return {"index": "my_index", ...}
+    
+    @override
+    def size(self, collection_name: str | None = None) -> int:
+        """Get the size of the collection in vector store.
+
+        Args:
+            collection_name: The target collection. If not provided, use 
defualt collection.
+        """
+        size = ...
+        return size
+
+    @override
+    def get(
+        self,
+        ids: str | List[str] | None = None,
+        collection_name: str | None = None,
+        **kwargs: Any,
+    ) -> List[Document]:
+        """Retrieve documents from the vector store by its ID.
+
+        Args:
+            ids: Unique identifier of the documents to retrieve. If not 
provided, get all documents.
+            collection_name: The collection name of the documents to retrieve.
+                             If not provided, use defualt collection.
+            **kwargs: Vector store specific parameters (offset, limit, filter 
etc.)
+
+        Returns:
+            Document object if found, None otherwise
+        """
+        documents: List[Document] = ...
+        return documents
+
+    @override
+    def delete(
+        self,
+        ids: str | List[str] | None = None,
+        collection_name: str | None = None,
+        **kwargs: Any,
+    ) -> None:
+        """Delete documents in the vector store by its IDs.
+
+        Args:
+            ids: Unique identifier of the documents to delete. If not 
provided, delete all documents.
+            collection_name: The collection name of the documents belong to. 
+                             If not provided, use defualt collection.
+            **kwargs: Vector store specific parameters (filter etc.)
+        """
+        # delete the documents 
+        pass
 
+    @override
     def query_embedding(self, embedding: List[float], limit: int = 10, 
**kwargs: Any) -> List[Document]:
-        # Core method: perform vector search using pre-computed embedding
-        # - embedding: Pre-computed embedding vector for semantic search
-        # - limit: Maximum number of results to return
-        # - kwargs: Vector store-specific parameters
-        # - Returns: List of Document objects matching the search criteria
-        pass
+        """Perform vector search using pre-computed embedding.
+
+        Args:
+            embedding: Pre-computed embedding vector for semantic search
+            limit: Maximum number of results to return (default: 10)
+            collection_name: The collection to apply the query.
+                             If not provided, use default collection.
+            **kwargs: Vector store-specific parameters (filters, distance 
metrics, etc.)
+
+        Returns:
+            List of documents matching the search criteria
+        """
+        documents: List[Document] = ...
+        return documents
+        
+    @override
+    def _add_embedding(
+        self,
+        *,
+        documents: List[Document],
+        collection_name: str | None = None,
+        **kwargs: Any,
+    ) -> List[str]:
+        """Add documents with pre-computed embeddings to the vector store.
+
+        Args:
+            documents: Documents with embeddings to add to the vector store
+            collection_name: The collection name of the documents to add.
+                             If not provided, use default collection.
+            **kwargs: Vector store-specific parameters (collection, namespace, 
etc.)
+
+        Returns:
+            List of document IDs that were added to the vector store
+        """
+        # add the documents
+        ids: List[str] = ...
+        return ids
 ```
 
 {{< /tab >}}
@@ -629,15 +867,193 @@ public class MyVectorStore extends BaseVectorStore {
         kwargs.put("index", "my_index");
         return kwargs;
     }
+    
+    /**
+     * Get the size of the collection in vector store.
+     *
+     * @param collection The name of the collection to count. If is null, 
count the default
+     *     collection.
+     * @return The documents count in the collection.
+     */
+    @Override
+    public long size(@Nullable String collection) throws Exception {
+        size = ...;
+        return size;
+    }
 
+    /**
+     * Retrieve documents from the vector store.
+     *
+     * @param ids The ids of the documents. If is null, get all the documents 
or first n documents
+     *     according to implementation specific limit.
+     * @param collection The name of the collection to be retrieved. If is 
null, retrieve the
+     *     default collection.
+     * @param extraArgs Additional arguments.
+     * @return List of documents retrieved.
+     */
     @Override
-    public List<Document> queryEmbedding(float[] embedding, int limit, 
Map<String, Object> args) {
-        // Core method: perform vector search using pre-computed embedding
-        // - embedding: Pre-computed embedding vector for semantic search
-        // - limit: Maximum number of results to return
-        // - args: Vector store-specific parameters
-        // - Returns: List of Document objects matching the search criteria
-        return null;
+    public List<Document> get(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException {
+        List<Document> documents = ...;
+        return documents;
+    }
+
+    /**
+     * Delete documents in the vector store.
+     *
+     * @param ids The ids of the documents. If is null, delete all the 
documents.
+     * @param collection The name of the collection the documents belong to. 
If is null, use the
+     *     default collection.
+     * @param extraArgs Additional arguments.
+     */
+    @Override
+    public void delete(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException {
+        // delete the documents
+    }
+
+    /**
+     * Performs vector search using a pre-computed embedding.
+     *
+     * @param embedding The embedding vector to search with
+     * @param limit Maximum number of results to return
+     * @param collection The collection to query to. If is null, query the 
default collection.
+     * @param args Additional arguments for the vector search
+     * @return List of documents matching the query embedding
+     */
+    @Override
+    protected List<Document> queryEmbedding(
+            float[] embedding, int limit, @Nullable String collection, 
Map<String, Object> args) {
+        List<Document> documents = ...;
+        return documents;
+    }
+
+    /**
+     * Add documents with pre-computed embedding to vector store.
+     *
+     * @param documents The documents to be added.
+     * @param collection The name of the collection to add to. If is null, add 
to the default
+     *     collection.
+     * @param extraArgs Additional arguments.
+     * @return IDs of the added documents.
+     */
+    @Override
+    protected List<String> addEmbedding(
+            List<Document> documents, @Nullable String collection, Map<String, 
Object> extraArgs)
+            throws IOException {
+        // add the documents
+        List<String> ids = ...;
+        return ids;
+    }
+}
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+### CollectionManageableVectorStore
+
+For vector store which support collection level operations, user can implement 
follow methods additionally.
+
+{{< tabs "Custom Vector Store support Collection" >}}
+
+{{< tab "Python" >}}
+
+```python
+class MyVectorStore(CollectionManageableVectorStore):
+    # Add your custom configuration fields here
+
+    # implementation for `BaseVectoStore` method.
+    
+    @override
+    def get_or_create_collection(
+        self, name: str, metadata: Dict[str, Any] | None = None
+    ) -> Collection:
+        """Get a collection, or create it if it doesn't exist.
+
+        Args:
+            name: Name of the collection
+            metadata: Metadata of the collection
+        Returns:
+            The retrieved or created collection
+        """
+        collection: Collection = ...
+        return collection
+
+    @override
+    def get_collection(self, name: str) -> Collection:
+        """Get a collection, raise an exception if it doesn't exist.
+
+        Args:
+            name: Name of the collection
+        Returns:
+            The retrieved collection
+        """
+        collection: Collection = ...
+        return collection
+
+    @override
+    def delete_collection(self, name: str) -> Collection:
+        """Delete a collection.
+
+        Args:
+            name: Name of the collection
+        Returns:
+            The deleted collection
+        """
+        collection: Collection = ...
+        return collection
+```
+
+{{< /tab >}}
+
+{{< tab "Java" >}}
+
+```java
+public class MyVectorStore extends BaseVectorStore
+        implements CollectionManageableVectorStore{
+    // Add your custom configuration fields here
+
+    // implementation for `BaseVectoStore` method.
+    
+    /**
+     * Get a collection, or create it if it doesn't exist.
+     *
+     * @param name The name of the collection to get or create.
+     * @param metadata The metadata of the collection.
+     * @return The retrieved or created collection.
+     */
+    @override
+    public Collection getOrCreateCollection(String name, Map<String, Object> 
metadata) throws Exception {
+        Collection collection = ...;
+        return collection;
+    }
+
+    /**
+     * Get a collection by name.
+     *
+     * @param name The name of the collection to get.
+     * @return The retrieved collection.
+     */
+    @override
+    public Collection getCollection(String name) throws Exception {
+        Collection collection = ...;
+        return collection;
+    }
+
+    /**
+     * Delete a collection by name.
+     *
+     * @param name The name of the collection to delete.
+     * @return The deleted collection.
+     */
+    @override
+    public Collection deleteCollection(String name) throws Exception {
+        Collection collection = ...;
+        return collection;
     }
 }
 ```

Reply via email to