wenjin272 commented on code in PR #341: URL: https://github.com/apache/flink-agents/pull/341#discussion_r2591167233
########## api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.vectorstores; + +import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup; +import org.apache.flink.agents.api.resource.Resource; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; + +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * Base abstract class for vector store. Provides vector store functionality that integrates + * embedding models for text-based semantic search. Handles both connection management and embedding + * generation internally. + * + * @param <ContentT> The type of content stored in the vector store documents + */ +public abstract class BaseVectorStore<ContentT> extends Resource { + + /** Name of the embedding model resource to use. */ + protected final String embeddingModel; + + public BaseVectorStore( + ResourceDescriptor descriptor, BiFunction<String, ResourceType, Resource> getResource) { + super(descriptor, getResource); + this.embeddingModel = descriptor.getArgument("embedding_model"); + } + + @Override + public ResourceType getResourceType() { + return ResourceType.VECTOR_STORE; + } + + /** + * Returns vector store setup settings passed to connection. These parameters are merged with + * query-specific parameters when performing vector search operations. + * + * @return A map containing the store configuration parameters + */ + public abstract Map<String, Object> getStoreKwargs(); + + /** + * Performs vector search using structured query object. Converts text query to embeddings and + * returns structured query result. + * + * @param query VectorStoreQuery object containing query parameters + * @return VectorStoreQueryResult containing the retrieved documents + */ + public VectorStoreQueryResult<ContentT> query(VectorStoreQuery query) { + final BaseEmbeddingModelSetup embeddingModel = + (BaseEmbeddingModelSetup) + this.getResource.apply(this.embeddingModel, ResourceType.EMBEDDING_MODEL); + + // TODO + // for now, we don't need to use additional parameters. + final float[] queryEmbedding = embeddingModel.embed(query.getQueryText(), Map.of()); Review Comment: Could directly use `embeddingModel.embed(query.getQueryText())` here. ########## examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchRagExample.java: ########## @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.examples.rag; + +import org.apache.flink.agents.api.Agent; +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.annotation.*; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.api.event.ChatRequestEvent; +import org.apache.flink.agents.api.event.ChatResponseEvent; +import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent; +import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.vectorstores.Document; +import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection; +import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup; +import org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelConnection; +import org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelSetup; +import org.apache.flink.agents.integrations.vectorstores.elasticsearch.ElasticsearchVectorStore; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Retrieval-Augmented Generation (RAG) example using Ollama for embeddings and chat along with + * Elasticsearch as the vector store. + * + * <p>This example demonstrates an agent that: + * + * <ul> + * <li>Embeds the incoming user query using an Ollama embedding model, + * <li>Retrieves relevant context from Elasticsearch via a {@code VectorStore}, + * <li>Formats a prompt that includes the retrieved context, and + * <li>Generates a response using an Ollama chat model. + * </ul> + * + * <p>Prerequisites: + * + * <ul> + * <li>Elasticsearch 8.x reachable from this example with an index that contains a {@code + * dense_vector} field for KNN search. + * <li>Ollama running locally (default {@code http://localhost:11434}) with the configured + * embedding and chat models available. + * </ul> + * + * <p>Example Elasticsearch mapping (adjust index name, field, dims, and similarity to your needs): + * + * <pre>{@code + * { + * "mappings": { + * "properties": { + * "content": { "type": "text" }, + * "metadata": { "type": "object", "enabled": false }, + * "content_vector": { "type": "dense_vector", "dims": 768, "similarity": "cosine" } + * } + * } + * } + * }</pre> + * + * <p>System properties you can override: + * + * <ul> + * <li>{@code ES_HOST} (default {@code http://localhost:9200}) + * <li>{@code ES_INDEX} (default {@code my_documents}) + * <li>{@code ES_VECTOR_FIELD} (default {@code content_vector}) + * <li>{@code ES_DIMS} (default {@code 768}) + * <li>{@code ES_SIMILARITY} (default {@code cosine}) — used by the optional population step + * <li>Authentication (optional, used by both vector store and population step): + * <ul> + * <li>{@code ES_API_KEY_BASE64} — Base64 of {@code apiKeyId:apiKeySecret} + * <li>{@code ES_API_KEY_ID} and {@code ES_API_KEY_SECRET} — combined and Base64-encoded + * <li>{@code ES_USERNAME} and {@code ES_PASSWORD} — basic authentication + * </ul> + * <li>{@code OLLAMA_ENDPOINT} (default {@code http://localhost:11434}) + * <li>{@code OLLAMA_EMBEDDING_MODEL} (default {@code nomic-embed-text}) + * <li>{@code OLLAMA_CHAT_MODEL} (default {@code qwen3:8b}) + * <li>{@code ES_POPULATE} (default {@code true}) — whether to populate sample data on startup + * </ul> + * + * <p>Direct CLI flags (optional): <br> + * Instead of or in addition to system properties, you can pass flags when starting the job. CLI + * flags take precedence over existing system properties. + * + * <ul> + * <li>{@code --es.host=http://your-es:9200} + * <li>{@code --es.username=elastic} and {@code --es.password=secret} + * <li>{@code --es.apiKeyBase64=BASE64_ID_COLON_SECRET} or {@code --es.apiKeyId=ID} with {@code + * --es.apiKeySecret=SECRET} + * <li>{@code --es.index=my_documents}, {@code --es.vectorField=content_vector}, {@code + * --es.dims=768}, {@code --es.similarity=cosine} + * <li>{@code --ollama.endpoint=http://localhost:11434}, {@code + * --ollama.embeddingModel=nomic-embed-text}, {@code --ollama.chatModel=qwen3:8b} + * <li>{@code --es.populate=true|false} + * </ul> + * + * <p>Examples: + * + * <pre>{@code + * # Use API key (base64 of id:secret) and custom host + * flink run ... -c org.apache.flink.agents.examples.rag.ElasticsearchRagExample \ + * examples.jar --es.host=http://es:9200 --es.apiKeyBase64=XXXXX= + * + * # Use basic authentication + * flink run ... -c org.apache.flink.agents.examples.rag.ElasticsearchRagExample \ + * examples.jar --es.host=http://es:9200 --es.username=elastic --es.password=secret + * }</pre> + * + * <p>Notes: + * + * <ul> + * <li>Authentication can be provided via either CLI flags or System properties; API key takes + * precedence over basic auth when both are present. + * <li>The optional knowledge base population step uses the same System properties to connect to + * Elasticsearch. + * </ul> + * + * <p>Running the example will: + * + * <ol> + * <li>Optionally populate the Elasticsearch index with sample documents and stored vectors, + * <li>Create a simple agent pipeline that retrieves context from Elasticsearch, and + * <li>Print the model's answers for a set of example queries. + * </ol> + */ +public class ElasticsearchRagExample { + + public static class MyRagAgent extends Agent { + + @Prompt + public static org.apache.flink.agents.api.prompt.Prompt contextEnhancedPrompt() { + String template = + "Based on the following context, please answer the user's question.\n\n" + + "Context:\n{context}\n\n" + + "User Question:\n{user_query}\n\n" + + "Please provide a helpful answer based on the context provided."; + return new org.apache.flink.agents.api.prompt.Prompt(template); + } + + @EmbeddingModelConnection + public static ResourceDescriptor textEmbedderConnection() { + return ResourceDescriptor.Builder.newBuilder( + OllamaEmbeddingModelConnection.class.getName()) + .addInitialArgument( + "host", System.getProperty("OLLAMA_ENDPOINT", "http://localhost:11434")) + .addInitialArgument( + "model", + System.getProperty("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")) + .build(); + } + + @EmbeddingModelSetup + public static ResourceDescriptor textEmbedder() { + // Embedding setup referencing the embedding connection name + return ResourceDescriptor.Builder.newBuilder(OllamaEmbeddingModelSetup.class.getName()) + .addInitialArgument("connection", "textEmbedderConnection") + .build(); + } + + @ChatModelConnection + public static ResourceDescriptor ollamaChatModelConnection() { + return ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName()) + .addInitialArgument( + "endpoint", + System.getProperty("OLLAMA_ENDPOINT", "http://localhost:11434")) + .addInitialArgument("requestTimeout", 120) + .build(); + } + + @ChatModelSetup + public static ResourceDescriptor chatModel() { + return ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName()) + .addInitialArgument("connection", "ollamaChatModelConnection") + .addInitialArgument( + "model", System.getProperty("OLLAMA_CHAT_MODEL", "qwen3:8b")) + .build(); + } + + @VectorStore + public static ResourceDescriptor knowledgeBase() { + ResourceDescriptor.Builder builder = + ResourceDescriptor.Builder.newBuilder(ElasticsearchVectorStore.class.getName()) + .addInitialArgument("embedding_model", "textEmbedder") + .addInitialArgument( + "index", System.getProperty("ES_INDEX", "my_documents")) + .addInitialArgument( + "vector_field", + System.getProperty("ES_VECTOR_FIELD", "content_vector")) + .addInitialArgument("dims", Integer.getInteger("ES_DIMS", 768)) + .addInitialArgument( + "host", System.getProperty("ES_HOST", "http://localhost:9200")); + + // Optional authentication + String apiKeyBase64 = System.getProperty("ES_API_KEY_BASE64"); + String apiKeyId = System.getProperty("ES_API_KEY_ID"); + String apiKeySecret = System.getProperty("ES_API_KEY_SECRET"); + String username = System.getProperty("ES_USERNAME"); + String password = System.getProperty("ES_PASSWORD"); + + if (apiKeyBase64 != null && !apiKeyBase64.isEmpty()) { + builder.addInitialArgument("api_key_base64", apiKeyBase64); + } else if (apiKeyId != null + && apiKeySecret != null + && !apiKeyId.isEmpty() + && !apiKeySecret.isEmpty()) { + builder.addInitialArgument("api_key_id", apiKeyId) + .addInitialArgument("api_key_secret", apiKeySecret); + } else if (username != null + && password != null + && !username.isEmpty() + && !password.isEmpty()) { + builder.addInitialArgument("username", username) + .addInitialArgument("password", password); + } + + return builder.build(); + } + + /** + * Converts an incoming {@link InputEvent} into a {@link ContextRetrievalRequestEvent} that + * asks the vector store to fetch relevant documents for the input string. The vector store + * resource is referenced by name ({@code "knowledgeBase"}). + */ + @Action(listenEvents = {InputEvent.class}) + public static void processInput(InputEvent event, RunnerContext ctx) { + ctx.sendEvent( + new ContextRetrievalRequestEvent((String) event.getInput(), "knowledgeBase")); + } + + /** + * Receives retrieved documents from the vector store, constructs a context string, formats + * the prompt using the {@code contextEnhancedPrompt}, and emits a {@link ChatRequestEvent} + * targeting the configured chat model. + * + * @param event contains the user query and the list of retrieved documents + * @param context provides access to resources (e.g., the prompt) and lets the agent send + * the next event + */ + @Action(listenEvents = {ContextRetrievalResponseEvent.class}) + public static void processRetrievedContext( + ContextRetrievalResponseEvent<Map<String, Object>> event, RunnerContext context) + throws Exception { + final String userQuery = event.getQuery(); + final List<Document<Map<String, Object>>> docs = event.getDocuments(); + + // Build context text from retrieved documents + List<String> items = new ArrayList<>(); + for (int i = 0; i < docs.size(); i++) { + Object content = docs.get(i).getContent(); + items.add(String.format("%d. %s", i + 1, content)); + } + String contextText = String.join("\n\n", items); Review Comment: Here, the content is a `Map<String, Object>`, we just convert it to `String` here, so the contextText is actually ``` 1. {metadata={source=official, topic=flink}, content_vector=[0.015702553, 0.09362559, -0.16092904, ..., 0.007019141, 0.058410715, 0.009363564, -0.046617374, 0.0066441908, 0.010014975, 0.02780144, -0.07676514, -0.035377625, -0.04142186, -0.0014243895], content=Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.} ``` The `metadata` and `content_vector` may be not needed to pass to llm. This also made me wonder: do we really need to use a Map-type document, or is String sufficient? ########## integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.integrations.vectorstores.elasticsearch; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.core.SearchRequest; +import co.elastic.clients.elasticsearch.core.SearchResponse; +import co.elastic.clients.elasticsearch.core.search.Hit; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import org.apache.flink.agents.api.resource.Resource; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.vectorstores.BaseVectorStore; +import org.apache.flink.agents.api.vectorstores.Document; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.message.BasicHeader; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import java.io.IOException; +import java.io.StringReader; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.function.BiFunction; + +/** + * Elasticsearch-backed implementation of a vector store. + * + * <p>This implementation executes approximate nearest neighbor (ANN) KNN queries against an + * Elasticsearch index that contains a dense vector field. It integrates with an embedding model + * (configured via the {@code embedding_model} resource argument inherited from {@link + * BaseVectorStore}) to convert query text into embeddings and then performs vector search using + * Elasticsearch's KNN capabilities. + * + * <p>Configuration is provided through {@link + * org.apache.flink.agents.api.resource.ResourceDescriptor} arguments. The most relevant ones are: + * + * <ul> + * <li>{@code index} (required): Target index name. + * <li>{@code vector_field} (required): Name of the dense vector field used for KNN. + * <li>{@code dims} (optional): Vector dimensionality; defaults to {@link #DEFAULT_DIMENSION}. + * <li>{@code k} (optional): Number of nearest neighbors to return; can be overridden per query. + * <li>{@code num_candidates} (optional): Candidate set size for ANN search; can be overridden per + * query. + * <li>{@code filter_query} (optional): A raw JSON Elasticsearch filter query (DSL) that is + * applied as a post-filter; can be overridden per query. + * <li>{@code host} or {@code hosts} (optional): Elasticsearch endpoint(s). If omitted, defaults + * to {@code localhost:9200}. + * <li>Authentication (optional): Either basic auth via {@code username}/{@code password}, or API + * key via {@code api_key_base64} or {@code api_key_id}/{@code api_key_secret}. + * </ul> + * + * <p>Example usage (aligned with ElasticsearchRagExample): + * + * <pre>{@code + * ResourceDescriptor desc = ResourceDescriptor.Builder + * .newBuilder(ElasticsearchVectorStore.class.getName()) + * .addInitialArgument("embedding_model", "textEmbedder") // name of embedding resource + * .addInitialArgument("index", "my_documents") + * .addInitialArgument("vector_field", "content_vector") + * .addInitialArgument("dims", 768) + * .addInitialArgument("host", "http://localhost:9200") + * // Optional auth (API key or basic): + * // .addInitialArgument("api_key_base64", "<BASE64_ID_COLON_SECRET>") + * // .addInitialArgument("username", "elastic") + * // .addInitialArgument("password", "secret") + * .build(); + * }</pre> + */ +public class ElasticsearchVectorStore extends BaseVectorStore<Map<String, Object>> { + + /** Default vector dimensionality used when {@code dims} is not provided. */ + public static final int DEFAULT_DIMENSION = 768; + + /** Low-level Elasticsearch client used to execute search requests. */ + private final ElasticsearchClient client; + + /** Target index name. */ + private final String index; + /** Name of the dense vector field on which KNN queries are executed. */ + private final String vectorField; + /** Vector dimensionality of the {@link #vectorField}. */ + private final int dims; + /** Default value for KNN result size (can be overridden per query). */ + private final Integer k; + /** Default number of ANN candidates for KNN (can be overridden per query). */ + private final Integer numCandidates; + /** Optional default filter query in Elasticsearch JSON DSL (can be overridden per query). */ + private final String filterQuery; + + /** + * Creates a new {@code ElasticsearchVectorStore} from the provided descriptor and resource + * resolver. + * + * <p>The constructor reads connection, authentication, and query defaults from the descriptor + * and prepares an {@link ElasticsearchClient} instance. It also validates required arguments. + * + * @param descriptor Resource descriptor containing configuration arguments + * @param getResource Function to resolve other resources by name and type + * @throws IllegalArgumentException if required arguments are missing or invalid + */ + public ElasticsearchVectorStore( + ResourceDescriptor descriptor, BiFunction<String, ResourceType, Resource> getResource) { + super(descriptor, getResource); + + // Required query-related arguments + this.index = descriptor.getArgument("index"); + this.vectorField = descriptor.getArgument("vector_field"); + final Integer dimsArg = descriptor.getArgument("dims"); + this.dims = (dimsArg != null) ? dimsArg : DEFAULT_DIMENSION; + this.filterQuery = descriptor.getArgument("filter_query"); + + this.k = descriptor.getArgument("k"); + this.numCandidates = descriptor.getArgument("num_candidates"); + + if (this.k != null && this.numCandidates != null) { + if (this.k < this.numCandidates) { + throw new IllegalArgumentException( + "'k' should be greater or equals than 'num_candidates'"); + } + } + + if (this.vectorField == null || this.vectorField.isEmpty()) { + throw new IllegalArgumentException("'vector_field' should not be null or empty"); + } + + if (this.index == null || this.index.isEmpty()) { + throw new IllegalArgumentException("'index' should not be null or empty"); + } + + // Resolve Elasticsearch HTTP hosts. Precedence: host -> hosts -> default localhost + final String hostUrl = descriptor.getArgument("host"); + final String hostsCsv = descriptor.getArgument("hosts"); + final List<HttpHost> httpHosts = new ArrayList<>(); + + if (hostUrl != null) { + httpHosts.add(HttpHost.create(hostUrl)); + } else if (hostsCsv != null) { + for (String host : hostsCsv.split(",")) { + httpHosts.add(HttpHost.create(host.trim())); + } + } else { + httpHosts.add(HttpHost.create("localhost:9200")); + } + + final RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0])); + + // Authentication configuration: API key (preferred) or basic auth + final String username = descriptor.getArgument("username"); + final String password = descriptor.getArgument("password"); + final String apiKeyBase64 = descriptor.getArgument("api_key_base64"); + final String apiKeyId = descriptor.getArgument("api_key_id"); + final String apiKeySecret = descriptor.getArgument("api_key_secret"); + + if (apiKeyBase64 != null || (apiKeyId != null && apiKeySecret != null)) { + // Construct base64 token if only id/secret is provided + String token = apiKeyBase64; + if (token == null) { + String idColonSecret = apiKeyId + ":" + apiKeySecret; + token = + Base64.getEncoder() + .encodeToString(idColonSecret.getBytes(StandardCharsets.UTF_8)); + } + final Header[] defaultHeaders = + new Header[] {new BasicHeader("Authorization", "ApiKey " + token)}; + builder.setDefaultHeaders(defaultHeaders); + } else if (username != null && password != null) { + // Fall back to HTTP basic authentication + final BasicCredentialsProvider creds = new BasicCredentialsProvider(); + creds.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + builder.setHttpClientConfigCallback(hcb -> hcb.setDefaultCredentialsProvider(creds)); + } + + // Build the REST client and the transport layer used by the high-level client + final RestClient restClient = builder.build(); + final ElasticsearchTransport transport = + new RestClientTransport(restClient, new JacksonJsonpMapper()); + this.client = new ElasticsearchClient(transport); + } + + /** + * Returns default store-level arguments collected from the descriptor. + * + * <p>The returned map can be merged with per-query arguments to form the complete set of + * parameters for a vector search operation. + * + * @return map of default store arguments such as {@code index}, {@code vector_field}, {@code + * dims}, and optionally {@code k}, {@code num_candidates}, {@code filter_query}. + */ + @Override + public Map<String, Object> getStoreKwargs() { + final Map<String, Object> m = new HashMap<>(); + m.put("index", this.index); + m.put("vector_field", this.vectorField); + m.put("dims", this.dims); + if (this.k != null) { + m.put("k", this.k); + } + if (this.numCandidates != null) { + m.put("num_candidates", this.numCandidates); + } + if (this.filterQuery != null) { + m.put("filter_query", this.filterQuery); + } + return m; + } + + /** + * Executes a KNN vector search using a pre-computed embedding. + * + * <p>The method prepares a KNN search request using the supplied {@code embedding} and merges + * default arguments from the store with the provided {@code args}. Optional filter queries + * (JSON DSL) are applied as a post filter. + * + * @param embedding The embedding vector to search with + * @param limit Maximum number of items the caller is interested in; used as a fallback for + * {@code k} if not explicitly provided + * @param args Additional arguments. Supported keys: {@code k}, {@code num_candidates}, {@code + * filter_query} + * @return A list of matching documents, possibly empty + * @throws RuntimeException if the search request fails + */ + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public List<Document<Map<String, Object>>> queryEmbedding( + float[] embedding, int limit, Map<String, Object> args) { + try { + int k = (int) args.getOrDefault("k", Math.max(1, limit)); + + int numCandidates = (int) args.getOrDefault("num_candidates", Math.max(100, k * 2)); + String filter = (String) args.get("filter_query"); + + List<Float> queryVector = new ArrayList<>(embedding.length); + for (float v : embedding) queryVector.add(v); + + SearchRequest.Builder builder = + new SearchRequest.Builder() + .index(this.index) + .knn( + kb -> + kb.field(this.vectorField) + .queryVector(queryVector) + .k(k) + .numCandidates(numCandidates)); + + if (filter != null) { + builder = builder.postFilter(f -> f.withJson(new StringReader(filter))); + } + + final SearchResponse<Map<String, Object>> searchResponse = + (SearchResponse) this.client.search(builder.build(), Map.class); Review Comment: I think this make sense to some extent. But if different implementation return document in different structure, the user application must be aware of the implementation they used. If they changed the vector store used in their code, they may also change the code which process the returned documents. WDTY @alnzng ? ########## integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.integrations.vectorstores.elasticsearch; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.core.SearchRequest; +import co.elastic.clients.elasticsearch.core.SearchResponse; +import co.elastic.clients.elasticsearch.core.search.Hit; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import org.apache.flink.agents.api.resource.Resource; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.vectorstores.BaseVectorStore; +import org.apache.flink.agents.api.vectorstores.Document; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.message.BasicHeader; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import java.io.IOException; +import java.io.StringReader; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.function.BiFunction; + +/** + * Elasticsearch-backed implementation of a vector store. + * + * <p>This implementation executes approximate nearest neighbor (ANN) KNN queries against an + * Elasticsearch index that contains a dense vector field. It integrates with an embedding model + * (configured via the {@code embedding_model} resource argument inherited from {@link + * BaseVectorStore}) to convert query text into embeddings and then performs vector search using + * Elasticsearch's KNN capabilities. + * + * <p>Configuration is provided through {@link + * org.apache.flink.agents.api.resource.ResourceDescriptor} arguments. The most relevant ones are: + * + * <ul> + * <li>{@code index} (required): Target index name. + * <li>{@code vector_field} (required): Name of the dense vector field used for KNN. + * <li>{@code dims} (optional): Vector dimensionality; defaults to {@link #DEFAULT_DIMENSION}. + * <li>{@code k} (optional): Number of nearest neighbors to return; can be overridden per query. + * <li>{@code num_candidates} (optional): Candidate set size for ANN search; can be overridden per + * query. + * <li>{@code filter_query} (optional): A raw JSON Elasticsearch filter query (DSL) that is + * applied as a post-filter; can be overridden per query. + * <li>{@code host} or {@code hosts} (optional): Elasticsearch endpoint(s). If omitted, defaults + * to {@code localhost:9200}. + * <li>Authentication (optional): Either basic auth via {@code username}/{@code password}, or API + * key via {@code api_key_base64} or {@code api_key_id}/{@code api_key_secret}. + * </ul> + * + * <p>Example usage (aligned with ElasticsearchRagExample): + * + * <pre>{@code + * ResourceDescriptor desc = ResourceDescriptor.Builder + * .newBuilder(ElasticsearchVectorStore.class.getName()) + * .addInitialArgument("embedding_model", "textEmbedder") // name of embedding resource + * .addInitialArgument("index", "my_documents") + * .addInitialArgument("vector_field", "content_vector") + * .addInitialArgument("dims", 768) + * .addInitialArgument("host", "http://localhost:9200") + * // Optional auth (API key or basic): + * // .addInitialArgument("api_key_base64", "<BASE64_ID_COLON_SECRET>") + * // .addInitialArgument("username", "elastic") + * // .addInitialArgument("password", "secret") + * .build(); + * }</pre> + */ +public class ElasticsearchVectorStore extends BaseVectorStore<Map<String, Object>> { + + /** Default vector dimensionality used when {@code dims} is not provided. */ + public static final int DEFAULT_DIMENSION = 768; + + /** Low-level Elasticsearch client used to execute search requests. */ + private final ElasticsearchClient client; + + /** Target index name. */ + private final String index; + /** Name of the dense vector field on which KNN queries are executed. */ + private final String vectorField; + /** Vector dimensionality of the {@link #vectorField}. */ + private final int dims; + /** Default value for KNN result size (can be overridden per query). */ + private final Integer k; + /** Default number of ANN candidates for KNN (can be overridden per query). */ + private final Integer numCandidates; + /** Optional default filter query in Elasticsearch JSON DSL (can be overridden per query). */ + private final String filterQuery; + + /** + * Creates a new {@code ElasticsearchVectorStore} from the provided descriptor and resource + * resolver. + * + * <p>The constructor reads connection, authentication, and query defaults from the descriptor + * and prepares an {@link ElasticsearchClient} instance. It also validates required arguments. + * + * @param descriptor Resource descriptor containing configuration arguments + * @param getResource Function to resolve other resources by name and type + * @throws IllegalArgumentException if required arguments are missing or invalid + */ + public ElasticsearchVectorStore( + ResourceDescriptor descriptor, BiFunction<String, ResourceType, Resource> getResource) { + super(descriptor, getResource); + + // Required query-related arguments + this.index = descriptor.getArgument("index"); + this.vectorField = descriptor.getArgument("vector_field"); + final Integer dimsArg = descriptor.getArgument("dims"); + this.dims = (dimsArg != null) ? dimsArg : DEFAULT_DIMENSION; + this.filterQuery = descriptor.getArgument("filter_query"); + + this.k = descriptor.getArgument("k"); + this.numCandidates = descriptor.getArgument("num_candidates"); + + if (this.k != null && this.numCandidates != null) { + if (this.k < this.numCandidates) { + throw new IllegalArgumentException( + "'k' should be greater or equals than 'num_candidates'"); + } + } + + if (this.vectorField == null || this.vectorField.isEmpty()) { + throw new IllegalArgumentException("'vector_field' should not be null or empty"); + } + + if (this.index == null || this.index.isEmpty()) { + throw new IllegalArgumentException("'index' should not be null or empty"); + } + + // Resolve Elasticsearch HTTP hosts. Precedence: host -> hosts -> default localhost + final String hostUrl = descriptor.getArgument("host"); + final String hostsCsv = descriptor.getArgument("hosts"); + final List<HttpHost> httpHosts = new ArrayList<>(); + + if (hostUrl != null) { + httpHosts.add(HttpHost.create(hostUrl)); + } else if (hostsCsv != null) { + for (String host : hostsCsv.split(",")) { + httpHosts.add(HttpHost.create(host.trim())); + } + } else { + httpHosts.add(HttpHost.create("localhost:9200")); + } + + final RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0])); + + // Authentication configuration: API key (preferred) or basic auth + final String username = descriptor.getArgument("username"); + final String password = descriptor.getArgument("password"); + final String apiKeyBase64 = descriptor.getArgument("api_key_base64"); + final String apiKeyId = descriptor.getArgument("api_key_id"); + final String apiKeySecret = descriptor.getArgument("api_key_secret"); + + if (apiKeyBase64 != null || (apiKeyId != null && apiKeySecret != null)) { + // Construct base64 token if only id/secret is provided + String token = apiKeyBase64; + if (token == null) { + String idColonSecret = apiKeyId + ":" + apiKeySecret; + token = + Base64.getEncoder() + .encodeToString(idColonSecret.getBytes(StandardCharsets.UTF_8)); + } + final Header[] defaultHeaders = + new Header[] {new BasicHeader("Authorization", "ApiKey " + token)}; + builder.setDefaultHeaders(defaultHeaders); + } else if (username != null && password != null) { + // Fall back to HTTP basic authentication + final BasicCredentialsProvider creds = new BasicCredentialsProvider(); + creds.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + builder.setHttpClientConfigCallback(hcb -> hcb.setDefaultCredentialsProvider(creds)); + } + + // Build the REST client and the transport layer used by the high-level client + final RestClient restClient = builder.build(); + final ElasticsearchTransport transport = + new RestClientTransport(restClient, new JacksonJsonpMapper()); + this.client = new ElasticsearchClient(transport); + } + + /** + * Returns default store-level arguments collected from the descriptor. + * + * <p>The returned map can be merged with per-query arguments to form the complete set of + * parameters for a vector search operation. + * + * @return map of default store arguments such as {@code index}, {@code vector_field}, {@code + * dims}, and optionally {@code k}, {@code num_candidates}, {@code filter_query}. + */ + @Override + public Map<String, Object> getStoreKwargs() { + final Map<String, Object> m = new HashMap<>(); + m.put("index", this.index); + m.put("vector_field", this.vectorField); + m.put("dims", this.dims); + if (this.k != null) { + m.put("k", this.k); + } + if (this.numCandidates != null) { + m.put("num_candidates", this.numCandidates); + } + if (this.filterQuery != null) { + m.put("filter_query", this.filterQuery); + } + return m; + } + + /** + * Executes a KNN vector search using a pre-computed embedding. + * + * <p>The method prepares a KNN search request using the supplied {@code embedding} and merges + * default arguments from the store with the provided {@code args}. Optional filter queries + * (JSON DSL) are applied as a post filter. + * + * @param embedding The embedding vector to search with + * @param limit Maximum number of items the caller is interested in; used as a fallback for + * {@code k} if not explicitly provided + * @param args Additional arguments. Supported keys: {@code k}, {@code num_candidates}, {@code + * filter_query} + * @return A list of matching documents, possibly empty + * @throws RuntimeException if the search request fails + */ + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public List<Document<Map<String, Object>>> queryEmbedding( + float[] embedding, int limit, Map<String, Object> args) { + try { + int k = (int) args.getOrDefault("k", Math.max(1, limit)); + + int numCandidates = (int) args.getOrDefault("num_candidates", Math.max(100, k * 2)); + String filter = (String) args.get("filter_query"); + + List<Float> queryVector = new ArrayList<>(embedding.length); + for (float v : embedding) queryVector.add(v); Review Comment: Indeed. sorry for this suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
