This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new e55516f [api][integration][java] Enrich interface for vector store
and implement these for elasticsearch. (#416)
e55516f is described below
commit e55516fd7a9f0740a51c27b7ea7c41cc156f4531
Author: Wenjin Xie <[email protected]>
AuthorDate: Wed Jan 7 23:09:16 2026 +0800
[api][integration][java] Enrich interface for vector store and implement
these for elasticsearch. (#416)
---
.../agents/api/vectorstores/BaseVectorStore.java | 87 +++-
.../CollectionManageableVectorStore.java | 67 +++
.../flink/agents/api/vectorstores/Document.java | 52 ++
.../agents/api/vectorstores/VectorStoreQuery.java | 26 +-
.../elasticsearch/ElasticsearchVectorStore.java | 576 ++++++++++++++++++++-
.../ElasticsearchVectorStoreTest.java | 151 ++++++
6 files changed, 933 insertions(+), 26 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
index de74c42..64cc745 100644
---
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
+++
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
@@ -23,6 +23,9 @@ import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
@@ -56,6 +59,34 @@ public abstract class BaseVectorStore extends Resource {
*/
public abstract Map<String, Object> getStoreKwargs();
+ /**
+ * Add documents to vector store.
+ *
+ * @param documents The documents to be added.
+ * @param collection The name of the collection to add to. If is null,
will add documents to the
+ * default collection.
+ * @param extraArgs The vector store specific arguments.
+ * @return The IDs of the documents added.
+ */
+ public List<String> add(
+ List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
+ throws IOException {
+ final BaseEmbeddingModelSetup embeddingModel =
+ (BaseEmbeddingModelSetup)
+ this.getResource.apply(this.embeddingModel,
ResourceType.EMBEDDING_MODEL);
+
+ for (Document doc : documents) {
+ if (doc.getEmbedding() == null) {
+ doc.setEmbedding(embeddingModel.embed(doc.getContent()));
+ }
+ }
+
+ final Map<String, Object> storeKwargs = this.getStoreKwargs();
+ storeKwargs.putAll(extraArgs);
+
+ return this.addEmbedding(documents, collection, extraArgs);
+ }
+
/**
* Performs vector search using structured query object. Converts text
query to embeddings and
* returns structured query result.
@@ -74,19 +105,69 @@ public abstract class BaseVectorStore extends Resource {
storeKwargs.putAll(query.getExtraArgs());
final List<Document> documents =
- this.queryEmbedding(queryEmbedding, query.getLimit(),
storeKwargs);
+ this.queryEmbedding(
+ queryEmbedding, query.getLimit(),
query.getCollection(), storeKwargs);
return new VectorStoreQueryResult(documents);
}
+ /**
+ * Get the size of the collection in vector store.
+ *
+ * @param collection The name of the collection to count. If is null,
count the default
+ * collection.
+ * @return The documents count in the collection.
+ */
+ public abstract long size(@Nullable String collection) throws Exception;
+
+ /**
+ * Retrieve documents from the vector store.
+ *
+ * @param ids The ids of the documents. If is null, get all the documents
or first n documents
+ * according to implementation specific limit.
+ * @param collection The name of the collection to be retrieved. If is
null, retrieve the
+ * default collection.
+ * @param extraArgs Additional arguments.
+ * @return List of documents retrieved.
+ */
+ public abstract List<Document> get(
+ @Nullable List<String> ids, @Nullable String collection,
Map<String, Object> extraArgs)
+ throws IOException;
+
+ /**
+ * Delete documents in the vector store.
+ *
+ * @param ids The ids of the documents. If is null, delete all the
documents.
+ * @param collection The name of the collection the documents belong to.
If is null, use the
+ * default collection.
+ * @param extraArgs Additional arguments.
+ */
+ public abstract void delete(
+ @Nullable List<String> ids, @Nullable String collection,
Map<String, Object> extraArgs)
+ throws IOException;
+
/**
* Performs vector search using a pre-computed embedding.
*
* @param embedding The embedding vector to search with
* @param limit Maximum number of results to return
+ * @param collection The collection to query to. If is null, query the
default collection.
* @param args Additional arguments for the vector search
* @return List of documents matching the query embedding
*/
- public abstract List<Document> queryEmbedding(
- float[] embedding, int limit, Map<String, Object> args);
+ protected abstract List<Document> queryEmbedding(
+ float[] embedding, int limit, @Nullable String collection,
Map<String, Object> args);
+
+ /**
+ * Add documents with pre-computed embedding to vector store.
+ *
+ * @param documents The documents to be added.
+ * @param collection The name of the collection to add to. If is null, add
to the default
+ * collection.
+ * @param extraArgs Additional arguments.
+ * @return IDs of the added documents.
+ */
+ protected abstract List<String> addEmbedding(
+ List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
+ throws IOException;
}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/CollectionManageableVectorStore.java
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/CollectionManageableVectorStore.java
new file mode 100644
index 0000000..0a247ae
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/CollectionManageableVectorStore.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.vectorstores;
+
+import java.util.Map;
+
+/** Base abstract class for vector store which support collection management.
*/
+public interface CollectionManageableVectorStore {
+
+ class Collection {
+ private final String name;
+ private final Map<String, Object> metadata;
+
+ public Collection(String name, Map<String, Object> metadata) {
+ this.name = name;
+ this.metadata = metadata;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Map<String, Object> getMetadata() {
+ return metadata;
+ }
+ }
+
+ /**
+ * Get a collection, or create it if it doesn't exist.
+ *
+ * @param name The name of the collection to get or create.
+ * @param metadata The metadata of the collection.
+ * @return The retrieved or created collection.
+ */
+ Collection getOrCreateCollection(String name, Map<String, Object>
metadata) throws Exception;
+
+ /**
+ * Get a collection by name.
+ *
+ * @param name The name of the collection to get.
+ * @return The retrieved collection.
+ */
+ Collection getCollection(String name) throws Exception;
+
+ /**
+ * Delete a collection by name.
+ *
+ * @param name The name of the collection to delete.
+ * @return The deleted collection.
+ */
+ Collection deleteCollection(String name) throws Exception;
+}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java
index 3fccdc8..00a308c 100644
--- a/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java
+++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java
@@ -18,7 +18,11 @@
package org.apache.flink.agents.api.vectorstores;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
import java.util.Map;
+import java.util.Objects;
/**
* A document retrieved from vector store search.
@@ -38,10 +42,18 @@ public class Document {
/** Document metadata such as source, author, timestamp, etc. */
private final Map<String, Object> metadata;
+ private @Nullable float[] embedding;
+
public Document(String content, Map<String, Object> metadata, String id) {
+ this(content, metadata, id, null);
+ }
+
+ public Document(
+ String content, Map<String, Object> metadata, String id, @Nullable
float[] embedding) {
this.content = content;
this.metadata = metadata;
this.id = id;
+ this.embedding = embedding;
}
public String getContent() {
@@ -55,4 +67,44 @@ public class Document {
public String getId() {
return id;
}
+
+ public void setEmbedding(float[] embedding) {
+ this.embedding = embedding;
+ }
+
+ @Nullable
+ public float[] getEmbedding() {
+ return embedding;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ Document document = (Document) o;
+ return Objects.equals(id, document.id)
+ && Objects.equals(content, document.content)
+ && Objects.equals(metadata, document.metadata)
+ && Arrays.equals(embedding, document.embedding);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, content, metadata, Arrays.hashCode(embedding));
+ }
+
+ @Override
+ public String toString() {
+ return "Document{"
+ + "id='"
+ + id
+ + '\''
+ + ", content='"
+ + content
+ + '\''
+ + ", metadata="
+ + metadata
+ + ", embedding="
+ + Arrays.toString(embedding)
+ + '}';
+ }
}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java
index af31dc6..bc55682 100644
---
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java
+++
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java
@@ -18,6 +18,8 @@
package org.apache.flink.agents.api.vectorstores;
+import javax.annotation.Nullable;
+
import java.util.HashMap;
import java.util.Map;
@@ -36,20 +38,26 @@ public class VectorStoreQuery {
private final String queryText;
/** Maximum number of documents to return. */
private final Integer limit;
+ /** The name of the collection to query to. */
+ private final @Nullable String collection;
/** Additional store-specific parameters. */
private final Map<String, Object> extraArgs;
+ public VectorStoreQuery(String queryText, Integer limit) {
+ this(VectorStoreQueryMode.SEMANTIC, queryText, limit, null, new
HashMap<>());
+ }
+
/**
* Creates a semantic-search query with default mode {@link
VectorStoreQueryMode#SEMANTIC}.
*
* @param queryText the text to embed and search for
* @param limit maximum number of results to return
+ * @param collection the collection to query to
+ * @param extraArgs store-specific additional parameters
*/
- public VectorStoreQuery(String queryText, Integer limit) {
- this.mode = VectorStoreQueryMode.SEMANTIC;
- this.queryText = queryText;
- this.limit = limit;
- this.extraArgs = new HashMap<>();
+ public VectorStoreQuery(
+ String queryText, Integer limit, String collection, Map<String,
Object> extraArgs) {
+ this(VectorStoreQueryMode.SEMANTIC, queryText, limit, collection, new
HashMap<>());
}
/**
@@ -58,16 +66,19 @@ public class VectorStoreQuery {
* @param mode the query mode
* @param queryText the text to search for
* @param limit maximum number of results to return
+ * @param collection the collection to query to
* @param extraArgs store-specific additional parameters
*/
public VectorStoreQuery(
VectorStoreQueryMode mode,
String queryText,
Integer limit,
+ @Nullable String collection,
Map<String, Object> extraArgs) {
this.mode = mode;
this.queryText = queryText;
this.limit = limit;
+ this.collection = collection;
this.extraArgs = extraArgs;
}
@@ -90,4 +101,9 @@ public class VectorStoreQuery {
public Map<String, Object> getExtraArgs() {
return extraArgs;
}
+
+ @Nullable
+ public String getCollection() {
+ return collection;
+ }
}
diff --git
a/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
index 4b63289..022f8df 100644
---
a/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
+++
b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
@@ -19,9 +19,29 @@
package org.apache.flink.agents.integrations.vectorstores.elasticsearch;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.mapping.DynamicMapping;
+import co.elastic.clients.elasticsearch._types.mapping.Property;
+import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+import co.elastic.clients.elasticsearch.core.CountRequest;
+import co.elastic.clients.elasticsearch.core.CountResponse;
+import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
+import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
+import co.elastic.clients.elasticsearch.core.DeleteRequest;
+import co.elastic.clients.elasticsearch.core.GetRequest;
+import co.elastic.clients.elasticsearch.core.GetResponse;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.MgetRequest;
+import co.elastic.clients.elasticsearch.core.MgetResponse;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
+import co.elastic.clients.elasticsearch.core.get.GetResult;
+import co.elastic.clients.elasticsearch.core.mget.MultiGetResponseItem;
import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
@@ -31,6 +51,7 @@ import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
import org.apache.flink.agents.api.vectorstores.Document;
import org.apache.http.Header;
import org.apache.http.HttpHost;
@@ -41,6 +62,8 @@ import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
@@ -91,16 +114,30 @@ import java.util.function.BiFunction;
* .build();
* }</pre>
*/
-public class ElasticsearchVectorStore extends BaseVectorStore {
+public class ElasticsearchVectorStore extends BaseVectorStore
+ implements CollectionManageableVectorStore {
/** Default vector dimensionality used when {@code dims} is not provided.
*/
public static final int DEFAULT_DIMENSION = 768;
+ /** The maximum number of documents that can be retrieved in get. */
+ public static final int MAX_RESULT_WINDOW = 10000;
+
+ public static final String DEFAULT_METADATA_FIELD = "_metadata";
+ public static final String DEFAULT_CONTENT_FIELD = "_content";
+ public static final String DEFAULT_VECTOR_FIELD = "_vector";
+ public static final String COLLECTION_METADATA_INDEX =
"collection_metadata";
+ public static final String COLLECTION_METADATA_FIELD = "metadata";
+ public static final String COLLECTION_INDEX_NAME_FIELD = "index_name";
/** Low-level Elasticsearch client used to execute search requests. */
private final ElasticsearchClient client;
- /** Target index name. */
+ /** Default index name. */
private final String index;
+ /** Name of the content field to store the document content. */
+ private final String contentField;
+ /** Name of the metadata field to store additional metadatas. */
+ private final String metadataField;
/** Name of the dense vector field on which KNN queries are executed. */
private final String vectorField;
/** Vector dimensionality of the {@link #vectorField}. */
@@ -114,6 +151,9 @@ public class ElasticsearchVectorStore extends
BaseVectorStore {
private final ObjectMapper mapper = new ObjectMapper();
+ /** Whether the content of document is stored in content field. */
+ private final boolean storeInContentField;
+
/**
* Creates a new {@code ElasticsearchVectorStore} from the provided
descriptor and resource
* resolver.
@@ -129,9 +169,20 @@ public class ElasticsearchVectorStore extends
BaseVectorStore {
ResourceDescriptor descriptor, BiFunction<String, ResourceType,
Resource> getResource) {
super(descriptor, getResource);
+ this.storeInContentField =
+
Objects.requireNonNullElse(descriptor.getArgument("store_in_content_field"),
true);
+
// Required query-related arguments
this.index = descriptor.getArgument("index");
- this.vectorField = descriptor.getArgument("vector_field");
+ this.vectorField =
+ Objects.requireNonNullElse(
+ descriptor.getArgument("vector_field"),
DEFAULT_VECTOR_FIELD);
+ this.contentField =
+ Objects.requireNonNullElse(
+ descriptor.getArgument("content_field"),
DEFAULT_CONTENT_FIELD);
+ this.metadataField =
+ Objects.requireNonNullElse(
+ descriptor.getArgument("metadata_field"),
DEFAULT_METADATA_FIELD);
final Integer dimsArg = descriptor.getArgument("dims");
this.dims = (dimsArg != null) ? dimsArg : DEFAULT_DIMENSION;
this.filterQuery = descriptor.getArgument("filter_query");
@@ -146,14 +197,6 @@ public class ElasticsearchVectorStore extends
BaseVectorStore {
}
}
- if (this.vectorField == null || this.vectorField.isEmpty()) {
- throw new IllegalArgumentException("'vector_field' should not be
null or empty");
- }
-
- if (this.index == null || this.index.isEmpty()) {
- throw new IllegalArgumentException("'index' should not be null or
empty");
- }
-
// Resolve Elasticsearch HTTP hosts. Precedence: host -> hosts ->
default localhost
final String hostUrl = descriptor.getArgument("host");
final String hostsCsv = descriptor.getArgument("hosts");
@@ -205,6 +248,199 @@ public class ElasticsearchVectorStore extends
BaseVectorStore {
this.client = new ElasticsearchClient(transport);
}
+ @Override
+ public Collection getOrCreateCollection(String name, Map<String, Object>
metadata)
+ throws Exception {
+ // Check if index exists
+ ExistsRequest existsRequest = ExistsRequest.of(e -> e.index(name));
+ boolean exists = this.client.indices().exists(existsRequest).value();
+
+ if (!exists) {
+ // Store collection metadata
+ if (metadata != null && !metadata.isEmpty()) {
+ storeCollectionMetadata(name, metadata);
+ }
+
+ // Create index correspond to the collection.
+ createIndex(name, metadata);
+ }
+
+ return new Collection(name, metadata != null ? metadata :
Collections.emptyMap());
+ }
+
+ /**
+ * Creates an Elasticsearch index with vector field mapping.
+ *
+ * @param indexName The name of the index to create
+ * @param metadata Optional metadata for the index
+ * @throws IOException if the index creation fails
+ */
+ private void createIndex(String indexName, Map<String, Object> metadata)
throws IOException {
+
+ // Build mappings with vector field
+ Map<String, Property> properties = new HashMap<>();
+
+ // Add vector field mapping
+ Property vectorProperty =
+ Property.of(p -> p.denseVector(dv ->
dv.dims(this.dims).index(true)));
+ properties.put(this.vectorField, vectorProperty);
+
+ // Add text field for content
+ Property textProperty = Property.of(p -> p.text(t -> t));
+ properties.put(this.contentField, textProperty);
+
+ // Add metadata field mapping
+ Property metadataProperty = Property.of(p -> p.object(o -> o));
+ properties.put(this.metadataField, metadataProperty);
+
+ CreateIndexRequest createRequest =
+ CreateIndexRequest.of(
+ c ->
+ c.index(indexName)
+ .mappings(
+ m ->
+
m.properties(properties)
+
.dynamic(DynamicMapping.True)));
+
+ this.client.indices().create(createRequest);
+ }
+
+ /**
+ * Stores collection metadata in the COLLECTION_METADATA_INDEX.
+ *
+ * <p>This method creates the metadata index if it doesn't exist, and
stores the metadata
+ * associated with the given index name. The index name is used as the
document ID.
+ *
+ * @param indexName The name of the collection/index
+ * @param metadata The metadata to store
+ * @throws IOException if the operation fails
+ */
+ private void storeCollectionMetadata(String indexName, Map<String, Object>
metadata)
+ throws IOException {
+ // Check if metadata index exists
+ ExistsRequest existsRequest = ExistsRequest.of(e ->
e.index(COLLECTION_METADATA_INDEX));
+ boolean exists = this.client.indices().exists(existsRequest).value();
+
+ if (!exists) {
+ // Build mappings for metadata index
+ Map<String, Property> properties = new HashMap<>();
+
+ // Add object field for metadata (to store structured metadata)
+ Property metadataProperty = Property.of(p -> p.object(o -> o));
+ properties.put(COLLECTION_METADATA_FIELD, metadataProperty);
+
+ // Add text field for index name
+ Property indexNameProperty = Property.of(p -> p.keyword(k -> k));
+ properties.put(COLLECTION_INDEX_NAME_FIELD, indexNameProperty);
+
+ // Create metadata index
+ CreateIndexRequest createRequest =
+ CreateIndexRequest.of(
+ c ->
+ c.index(COLLECTION_METADATA_INDEX)
+ .mappings(
+ m ->
+
m.properties(properties)
+
.dynamic(DynamicMapping.True)));
+
+ this.client.indices().create(createRequest);
+ }
+
+ // Store metadata document (use indexName as document ID)
+ Map<String, Object> source = new HashMap<>();
+ source.put(COLLECTION_INDEX_NAME_FIELD, indexName);
+ source.put(COLLECTION_METADATA_FIELD, metadata);
+
+ IndexRequest<Map<String, Object>> indexRequest =
+ IndexRequest.of(
+ i ->
i.index(COLLECTION_METADATA_INDEX).id(indexName).document(source));
+
+ this.client.index(indexRequest);
+ }
+
+ /**
+ * Gets a collection by name.
+ *
+ * <p>This method retrieves the collection metadata from
COLLECTION_METADATA_INDEX using the
+ * collection name as the document ID.
+ *
+ * @param name The name of the collection to retrieve
+ * @return The retrieved collection with its metadata
+ * @throws Exception if the collection doesn't exist or the operation fails
+ */
+ @Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public Collection getCollection(String name) throws Exception {
+ // Check if index exists
+ ExistsRequest existsRequest = ExistsRequest.of(e -> e.index(name));
+ boolean exists = this.client.indices().exists(existsRequest).value();
+
+ if (!exists) {
+ throw new RuntimeException(String.format("Collection %s not
found", name));
+ }
+
+ // Get collection metadata from COLLECTION_METADATA_INDEX
+ GetRequest getRequest = GetRequest.of(g ->
g.index(COLLECTION_METADATA_INDEX).id(name));
+
+ GetResponse<Map<String, Object>> getResponse =
+ (GetResponse) this.client.get(getRequest, Map.class);
+
+ // Check if document exists
+ if (!getResponse.found()) {
+ throw new RuntimeException(String.format("Metadata for Collection
%s not found", name));
+ }
+
+ // Extract metadata from the document
+ Map<String, Object> source = getResponse.source();
+ if (source == null) {
+ throw new RuntimeException(String.format("Metadata for Collection
%s is null", name));
+ }
+
+ // Get metadata field
+ Map<String, Object> metadata =
+ (Map<String, Object>)
+ source.getOrDefault(COLLECTION_METADATA_FIELD,
Collections.emptyMap());
+
+ // Return collection object
+ return new Collection(name, metadata);
+ }
+
+ /**
+ * Deletes a collection by name.
+ *
+ * <p>This method deletes both the collection index and its metadata from
+ * COLLECTION_METADATA_INDEX. It first retrieves the collection metadata,
then deletes the index
+ * and the metadata document.
+ *
+ * @param name The name of the collection to delete
+ * @return The deleted collection with its metadata
+ * @throws RuntimeException if the collection doesn't exist or the
operation fails
+ */
+ @Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public Collection deleteCollection(String name) throws Exception {
+ // First, get the collection metadata before deletion
+ // This ensures the collection exists and retrieves its metadata for
return
+ Collection collection;
+ try {
+ collection = getCollection(name);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Collection %s not found or failed to
retrieve", name), e);
+ }
+
+ DeleteIndexRequest deleteIndexRequest = DeleteIndexRequest.of(d ->
d.index(name));
+ this.client.indices().delete(deleteIndexRequest);
+
+ // Delete the metadata document from COLLECTION_METADATA_INDEX
+ DeleteRequest deleteRequest =
+ DeleteRequest.of(d ->
d.index(COLLECTION_METADATA_INDEX).id(name));
+ this.client.delete(deleteRequest);
+
+ // Return the deleted collection
+ return collection;
+ }
+
/**
* Returns default store-level arguments collected from the descriptor.
*
@@ -236,6 +472,211 @@ public class ElasticsearchVectorStore extends
BaseVectorStore {
return m;
}
+ @Override
+ public long size(@Nullable String collection) throws Exception {
+ String index = collection == null ? this.index : collection;
+ CountRequest countRequest = CountRequest.of(c -> c.index(index));
+ CountResponse countResponse = this.client.count(countRequest);
+ return countResponse.count();
+ }
+
+ /**
+ * Retrieve documents from the vector store.
+ *
+ * <p>If ids is not provided, this method will retrieve documents
according to limit, offset and
+ * filter_query in additional arguments. If limit is also not provided,
this method will
+ * retrieve no more than {@link
ElasticsearchVectorStore#MAX_RESULT_WINDOW} documents because of
+ * the ElasticSearch limitation.
+ *
+ * @param ids The ids of the documents.
+ * @param collection The name of the collection to be retrieved. If is
null, retrieve the
+ * default collection.
+ * @param extraArgs Additional arguments. (limit, offset, filter_query,
etc.)
+ * @return List of documents retrieved.
+ */
+ @Override
+ public List<Document> get(
+ @Nullable List<String> ids, @Nullable String collection,
Map<String, Object> extraArgs)
+ throws IOException {
+ String index = collection == null ? this.index : collection;
+
+ if (ids != null && !ids.isEmpty()) {
+ // Get specific documents by IDs
+ return getDocumentsByIds(index, ids);
+ } else {
+ // Get all documents with optional filters, limit or offset.
+ return getDocuments(index, extraArgs);
+ }
+ }
+
+ /**
+ * Delete documents in the vector store.
+ *
+ * <p>If ids is not provided, this method will delete documents matched
the filter_query in
+ * additional arguments. If filter_query is not provided, this method will
delete all the
+ * documents.
+ *
+ * @param ids The ids of the documents.
+ * @param collection The name of the collection the documents belong to.
If is null, use the
+ * default collection.
+ * @param extraArgs Additional arguments, (filter_query, etc.)
+ */
+ @Override
+ public void delete(
+ @Nullable List<String> ids, @Nullable String collection,
Map<String, Object> extraArgs)
+ throws IOException {
+ String index = collection == null ? this.index : collection;
+
+ if (ids != null && !ids.isEmpty()) {
+ // Delete specific documents by IDs
+ deleteDocumentsByIds(index, ids);
+ } else {
+ // Delete all documents with optional filters
+ deleteDocuments(index, extraArgs);
+ }
+ }
+
+ /**
+ * Retrieves documents by their IDs using Elasticsearch multi-get API.
+ *
+ * @param index The index to query
+ * @param ids List of document IDs to retrieve
+ * @return List of Documents
+ * @throws IOException if the request fails
+ * @throws JsonProcessingException if JSON processing fails
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private List<Document> getDocumentsByIds(String index, List<String> ids)
+ throws IOException, JsonProcessingException {
+ MgetRequest mgetRequest = MgetRequest.of(m -> m.index(index).ids(ids));
+
+ MgetResponse<Map<String, Object>> mgetResponse =
+ (MgetResponse) this.client.mget(mgetRequest, Map.class);
+
+ List<Document> documents = new ArrayList<>();
+ for (MultiGetResponseItem<Map<String, Object>> item :
mgetResponse.docs()) {
+ if (item.isResult()) {
+ GetResult<Map<String, Object>> getResult = item.result();
+ Map<String, Object> source = getResult.source();
+ String id = getResult.id();
+ Document document = getDocument(id, source);
+ documents.add(document);
+ }
+ }
+ return documents;
+ }
+
+ /**
+ * Retrieves documents using Elasticsearch search API with optional
filters.
+ *
+ * @param index The index to query
+ * @param extraArgs Additional arguments (limit, offset, filter_query,
etc.)
+ * @return List of Documents
+ * @throws IOException if the request fails
+ * @throws JsonProcessingException if JSON processing fails
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private List<Document> getDocuments(String index, Map<String, Object>
extraArgs)
+ throws IOException, JsonProcessingException {
+ SearchRequest.Builder builder = new
SearchRequest.Builder().index(index);
+
+ // Handle limit (size)
+ Integer limit = (Integer) extraArgs.get("limit");
+ builder.size(Objects.requireNonNullElse(limit, MAX_RESULT_WINDOW));
+
+ // Handle offset (from)
+ Integer offset = (Integer) extraArgs.get("offset");
+ if (offset != null) {
+ builder.from(offset);
+ }
+
+ // Handle filter query
+ String filter = (String) extraArgs.get("filter_query");
+ if (filter != null) {
+ builder.query(q -> q.withJson(new StringReader(filter)));
+ }
+
+ // Execute search
+ SearchResponse<Map<String, Object>> searchResponse =
+ (SearchResponse) this.client.search(builder.build(),
Map.class);
+
+ final long total = searchResponse.hits().total().value();
+ if (total == 0) {
+ return Collections.emptyList();
+ }
+
+ return getDocuments((int) total, searchResponse);
+ }
+
+ /**
+ * Deletes documents by their IDs using Elasticsearch bulk delete API.
+ *
+ * @param index The index to delete from
+ * @param ids List of document IDs to delete
+ * @throws IOException if the request fails
+ */
+ private void deleteDocumentsByIds(String index, List<String> ids) throws
IOException {
+ // Prepare bulk delete operations
+ List<BulkOperation> bulkOperations = new ArrayList<>();
+ for (String id : ids) {
+ bulkOperations.add(BulkOperation.of(bo -> bo.delete(d ->
d.index(index).id(id))));
+ }
+
+ // Execute bulk delete request
+ BulkRequest bulkRequest = BulkRequest.of(br ->
br.operations(bulkOperations));
+ BulkResponse bulkResponse = this.client.bulk(bulkRequest);
+
+ // Check for errors
+ if (bulkResponse.errors()) {
+ StringBuilder errorMsg = new StringBuilder("Some documents failed
to delete: ");
+ bulkResponse.items().stream()
+ .filter(item -> item.error() != null)
+ .forEach(
+ item ->
+ errorMsg.append(
+ String.format(
+ "id=%s, error=%s; ",
+ item.id(),
item.error().reason())));
+ throw new RuntimeException(errorMsg.toString());
+ }
+ }
+
+ /**
+ * Deletes documents using Elasticsearch delete by query API.
+ *
+ * @param index The index to delete from
+ * @param extraArgs Additional arguments (filter_query, etc.)
+ * @throws IOException if the request fails
+ */
+ private void deleteDocuments(String index, Map<String, Object> extraArgs)
throws IOException {
+ DeleteByQueryRequest.Builder builder = new
DeleteByQueryRequest.Builder().index(index);
+
+ // Handle filter query
+ String filter = (String) extraArgs.get("filter_query");
+ if (filter != null) {
+ builder.query(q -> q.withJson(new StringReader(filter)));
+ } else {
+ // If no filter provided, delete all documents (match_all query)
+ builder.query(q -> q.matchAll(ma -> ma));
+ }
+
+ // Execute delete by query
+ DeleteByQueryResponse response =
this.client.deleteByQuery(builder.build());
+
+ // Check for failures
+ if (response.failures() != null && !response.failures().isEmpty()) {
+ StringBuilder errorMsg = new StringBuilder("Some documents failed
to delete: ");
+ response.failures()
+ .forEach(
+ failure ->
+ errorMsg.append(
+ String.format(
+ "id=%s, error=%s; ",
+ failure.id(),
failure.cause().reason())));
+ throw new RuntimeException(errorMsg.toString());
+ }
+ }
+
/**
* Executes a KNN vector search using a pre-computed embedding.
*
@@ -246,6 +687,7 @@ public class ElasticsearchVectorStore extends
BaseVectorStore {
* @param embedding The embedding vector to search with
* @param limit Maximum number of items the caller is interested in; used
as a fallback for
* {@code k} if not explicitly provided
+ * @param collection The index to query search. If is null, search the
default index.
* @param args Additional arguments. Supported keys: {@code k}, {@code
num_candidates}, {@code
* filter_query}
* @return A list of matching documents, possibly empty
@@ -253,8 +695,10 @@ public class ElasticsearchVectorStore extends
BaseVectorStore {
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
- public List<Document> queryEmbedding(float[] embedding, int limit,
Map<String, Object> args) {
+ public List<Document> queryEmbedding(
+ float[] embedding, int limit, @Nullable String collection,
Map<String, Object> args) {
try {
+ String index = collection == null ? this.index : collection;
int k = (int) args.getOrDefault("k", Math.max(1, limit));
int numCandidates = (int) args.getOrDefault("num_candidates",
Math.max(100, k * 2));
@@ -265,7 +709,7 @@ public class ElasticsearchVectorStore extends
BaseVectorStore {
SearchRequest.Builder builder =
new SearchRequest.Builder()
- .index(this.index)
+ .index(index)
.knn(
kb ->
kb.field(this.vectorField)
@@ -290,6 +734,80 @@ public class ElasticsearchVectorStore extends
BaseVectorStore {
}
}
+ /**
+ * Add documents with pre-computed embedding to vector store.
+ *
+ * <p>ElasticSearch will set the content of the document to content field.
+ */
+ @Override
+ protected List<String> addEmbedding(
+ List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
+ throws IOException {
+ String index = collection == null ? this.index : collection;
+ if (documents == null || documents.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ // Prepare bulk operations
+ List<BulkOperation> bulkOperations = new ArrayList<>();
+ List<String> documentIds = new ArrayList<>();
+
+ for (Document doc : documents) {
+ // Generate ID if not provided
+ String id = doc.getId();
+ if (id == null || id.isEmpty()) {
+ id = UUID.randomUUID().toString();
+ }
+ final String docId = id;
+ documentIds.add(docId);
+
+ Map<String, Object> source = new HashMap<>();
+
+ // Add embedding vector if available
+ float[] embedding = doc.getEmbedding();
+ if (embedding != null && embedding.length > 0) {
+ List<Float> embeddingList = new ArrayList<>(embedding.length);
+ for (float v : embedding) {
+ embeddingList.add(v);
+ }
+ source.put(this.vectorField, embeddingList);
+ }
+
+ source.put(this.contentField, doc.getContent());
+
+ // Add metadata
+ Map<String, Object> metadata = doc.getMetadata();
+ if (metadata != null && !metadata.isEmpty()) {
+ source.put(this.metadataField, doc.getMetadata());
+ }
+
+ // Create index operation for bulk request
+ bulkOperations.add(
+ BulkOperation.of(
+ bo -> bo.index(io ->
io.index(index).id(docId).document(source))));
+ }
+
+ // Execute bulk request
+ BulkRequest bulkRequest = BulkRequest.of(br ->
br.operations(bulkOperations));
+ BulkResponse bulkResponse = this.client.bulk(bulkRequest);
+
+ // Check for errors
+ if (bulkResponse.errors()) {
+ StringBuilder errorMsg = new StringBuilder("Some documents failed
to index: ");
+ bulkResponse.items().stream()
+ .filter(item -> item.error() != null)
+ .forEach(
+ item ->
+ errorMsg.append(
+ String.format(
+ "id=%s, error=%s; ",
+ item.id(),
item.error().reason())));
+ throw new RuntimeException(errorMsg.toString());
+ }
+
+ return documentIds;
+ }
+
/**
* Converts Elasticsearch hits into {@link Document} instances with
metadata.
*
@@ -304,13 +822,35 @@ public class ElasticsearchVectorStore extends
BaseVectorStore {
for (Hit<Map<String, Object>> hit : searchResponse.hits().hits()) {
final Map<String, Object> _source = hit.source();
final String id = hit.id();
- final Double score = hit.score();
- final String index = hit.index();
- final Map<String, Object> metadata = Map.of("id", id, "score",
score, "index", index);
- final String content = (_source == null) ? "" :
mapper.writeValueAsString(_source);
- final Document document = new Document(content, metadata, id);
+ final Document document = getDocument(id, _source);
documents.add(document);
}
return documents;
}
+
+ @SuppressWarnings("unchecked")
+ private Document getDocument(String id, Map<String, Object> source)
+ throws JsonProcessingException {
+ Map<String, Object> metadata = new HashMap<>();
+ String content = "";
+ if (source != null) {
+ Map<String, Object> extra = (Map<String, Object>)
source.remove(this.metadataField);
+ if (extra != null) {
+ metadata.putAll(extra);
+ }
+
+ // Elasticsearch supports store document as mappings. If
storeInContentField is
+ // true,
+ // we get the content from the specific field, otherwise, we get
the content from
+ // all
+ // the fields.
+ if (this.storeInContentField) {
+ content = (String) source.get(this.contentField);
+ } else {
+ source.remove(this.vectorField);
+ content = mapper.writeValueAsString(source);
+ }
+ }
+ return new Document(content, metadata, id);
+ }
}
diff --git
a/integrations/vector-stores/elasticsearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStoreTest.java
b/integrations/vector-stores/elasticsearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStoreTest.java
new file mode 100644
index 0000000..2aba422
--- /dev/null
+++
b/integrations/vector-stores/elasticsearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStoreTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.elasticsearch;
+
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore.Collection;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test for {@link ElasticsearchVectorStore}
+ *
+ * <p>Need setup Elasticsearch server to run this test. Look <a
+ *
href="https://www.elastic.co/docs/deploy-manage/deploy/self-managed/install-elasticsearch-docker-basic">Start
+ * a single-node cluster in Docker</a> for details.
+ *
+ * <p>For {@link ElasticsearchVectorStore} doesn't support security check yet,
when start the
+ * container, should add "-e xpack.security.enabled=false" option.
+ */
+@Disabled("Should setup Elasticsearch server.")
+public class ElasticsearchVectorStoreTest {
+ public static BaseVectorStore store;
+
+ public static Resource getResource(String name, ResourceType type) {
+ BaseEmbeddingModelSetup embeddingModel =
Mockito.mock(BaseEmbeddingModelSetup.class);
+ Mockito.when(embeddingModel.embed("Elasticsearch is a search engine"))
+ .thenReturn(new float[] {0.2f, 0.3f, 0.4f, 0.5f, 0.6f});
+ Mockito.when(
+ embeddingModel.embed(
+ "Apache Flink Agents is an Agentic AI
framework based on Apache Flink."))
+ .thenReturn(new float[] {0.1f, 0.2f, 0.3f, 0.4f, 0.5f});
+ return embeddingModel;
+ }
+
+ @BeforeAll
+ public static void initialize() {
+ final ResourceDescriptor.Builder builder =
+
ResourceDescriptor.Builder.newBuilder(ElasticsearchVectorStore.class.getName())
+ .addInitialArgument("embedding_model",
"embeddingModel")
+ .addInitialArgument("host", "localhost:9200")
+ .addInitialArgument("dims", 5)
+ .addInitialArgument("username", "elastic")
+ .addInitialArgument("password",
System.getenv("ES_PASSWORD"));
+ ;
+ store =
+ new ElasticsearchVectorStore(
+ builder.build(),
ElasticsearchVectorStoreTest::getResource);
+ }
+
+ @Test
+ public void testCollectionManagement() throws Exception {
+ CollectionManageableVectorStore vectorStore =
(CollectionManageableVectorStore) store;
+ String name = "collection_management";
+ Map<String, Object> metadata = Map.of("key1", "value1", "key2",
"value2");
+ vectorStore.getOrCreateCollection(name, metadata);
+
+ Collection collection = vectorStore.getCollection(name);
+
+ Assertions.assertNotNull(collection);
+ Assertions.assertEquals(name, collection.getName());
+ Assertions.assertEquals(0, store.size(name));
+ Assertions.assertEquals(metadata, collection.getMetadata());
+
+ vectorStore.deleteCollection(name);
+
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () -> vectorStore.getCollection(name),
+ String.format("Collection %s not found", name));
+ }
+
+ @Test
+ public void testDocumentManagement() throws Exception {
+ String name = "document_management";
+ Map<String, Object> metadata = Map.of("key1", "value1", "key2",
"value2");
+ ((CollectionManageableVectorStore) store).getOrCreateCollection(name,
metadata);
+
+ List<Document> documents = new ArrayList<>();
+ documents.add(
+ new Document(
+ "Elasticsearch is a search engine",
+ Map.of("category", "database", "source", "test"),
+ "doc1"));
+ documents.add(
+ new Document(
+ "Apache Flink Agents is an Agentic AI framework based
on Apache Flink.",
+ Map.of("category", "ai-agent", "source", "test"),
+ "doc2"));
+ store.add(documents, name, Collections.emptyMap());
+ // wait for the documents to become visible in the elasticsearch
server.
+ Thread.sleep(1000);
+ for (Document doc : documents) {
+ doc.setEmbedding(null);
+ }
+
+ // test get all documents
+ List<Document> all = store.get(null, name, Collections.emptyMap());
+ Assertions.assertEquals(documents, all);
+
+ // test get specific document
+ List<Document> specific =
+ store.get(Collections.singletonList("doc1"), name,
Collections.emptyMap());
+ Assertions.assertEquals(1, specific.size());
+ Assertions.assertEquals(documents.get(0), specific.get(0));
+
+ // test delete specific document
+ store.delete(Collections.singletonList("doc1"), name,
Collections.emptyMap());
+ Thread.sleep(1000);
+ List<Document> remain = store.get(null, name, Collections.emptyMap());
+ Assertions.assertEquals(1, remain.size());
+ Assertions.assertEquals(documents.get(1), remain.get(0));
+
+ // test delete all documents
+ store.delete(null, name, Collections.emptyMap());
+ Thread.sleep(1000);
+ List<Document> empty = store.get(null, name, Collections.emptyMap());
+ Assertions.assertTrue(empty.isEmpty());
+
+ ((CollectionManageableVectorStore) store).deleteCollection(name);
+ }
+}