This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 5eb9125d [integrations][java] Fix OpenSearchVectorStore for Amazon 
OpenSearch Serverless (#678)
5eb9125d is described below

commit 5eb9125d684e04299cf3d407efcd9de9fd371a3b
Author: Avichay Marciano <[email protected]>
AuthorDate: Mon May 18 07:14:17 2026 +0300

    [integrations][java] Fix OpenSearchVectorStore for Amazon OpenSearch 
Serverless (#678)
    
    Fixes #674
    
    AOSS-specific differences from provisioned domains, addressed without 
changing
    domain behaviour:
    
    - SigV4: add x-amz-content-sha256 and Content-Length before signing.
    - _refresh: skipped (not exposed on AOSS).
    - _bulk: omit custom _id on serverless and return AOSS-generated ids from
      the response so add() callers get usable ids for later get/delete.
    - updateEmbedding on serverless throws UnsupportedOperationException
      (no client-controllable _id, so update-by-id is impossible).
    - _bulk partial failures now surfaced (was silently dropping data).
    - createKnnIndex pins FAISS/HNSW (default NMSLIB on AOSS rejects filters).
    - Index propagation: 15s settle after create on serverless.
    
    Verified end-to-end against a live AOSS VECTORSEARCH collection.
    
    Co-authored-by: Avichay Marciano <[email protected]>
---
 .../opensearch/OpenSearchVectorStore.java          | 151 +++++++++++++++++++--
 1 file changed, 139 insertions(+), 12 deletions(-)

diff --git 
a/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
 
b/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
index 0f7cbf1a..f4b74795 100644
--- 
a/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
+++ 
b/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
@@ -46,6 +46,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.HashMap;
@@ -245,11 +246,18 @@ public class OpenSearchVectorStore extends BaseVectorStore
     }
 
     private void createKnnIndex(String idx) {
+        // Use the FAISS engine with HNSW: it supports both pre-filtered and 
post-filtered KNN
+        // queries (the default NMSLIB engine on AOSS does NOT support filters 
and rejects
+        // queries with "Engine [NMSLIB] does not support filters"), and is 
the recommended
+        // engine for both AOSS VECTORSEARCH collections and OpenSearch 
Service domains 2.x+.
         String body =
                 String.format(
                         "{\"settings\":{\"index\":{\"knn\":true}},"
                                 + 
"\"mappings\":{\"properties\":{\"%s\":{\"type\":\"knn_vector\","
-                                + 
"\"dimension\":%d},\"%s\":{\"type\":\"text\"},"
+                                + "\"dimension\":%d,"
+                                + 
"\"method\":{\"engine\":\"faiss\",\"name\":\"hnsw\","
+                                + "\"space_type\":\"l2\"}},"
+                                + "\"%s\":{\"type\":\"text\"},"
                                 + "\"metadata\":{\"type\":\"object\"}}}}",
                         vectorField, dims, contentField);
         try {
@@ -259,6 +267,16 @@ public class OpenSearchVectorStore extends BaseVectorStore
                 throw e;
             }
         }
+        if (serverless) {
+            // AOSS index creation is eventually consistent; the index returns 
200 on PUT but
+            // queries against it can fail with "no such index" for ~5-30s 
afterward. Give the
+            // service a short window to propagate before any read/write hits 
the index.
+            try {
+                Thread.sleep(15_000L);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
     }
 
     /** Sanitize collection name to valid OpenSearch index name (lowercase, no 
special chars). */
@@ -326,7 +344,7 @@ public class OpenSearchVectorStore extends BaseVectorStore
             }
         }
         executeRequest("POST", "/" + idx + "/_delete_by_query", 
body.toString());
-        executeRequest("POST", "/" + idx + "/_refresh", null);
+        refreshIfSupported(idx);
     }
 
     @Override
@@ -377,7 +395,19 @@ public class OpenSearchVectorStore extends BaseVectorStore
     public void updateEmbedding(
             List<Document> documents, @Nullable String collection, Map<String, 
Object> extraArgs)
             throws IOException {
-        // OpenSearch's bulk index operation is upsert-by-id, so addEmbedding 
doubles as update.
+        // OpenSearch's bulk index operation is upsert-by-id, so addEmbedding 
doubles as update on
+        // OpenSearch Service domains. On Amazon OpenSearch Serverless this 
pattern cannot work:
+        // AOSS rejects client-supplied _id in create/index operations, so the 
addEmbedding path
+        // below has to fall back to AOSS-generated ids. With no 
client-controllable id there is
+        // no way to target an existing document, and "updating" would 
silently insert a new copy
+        // instead, which would be worse than failing loudly.
+        if (serverless) {
+            throw new UnsupportedOperationException(
+                    "updateEmbedding is not supported on Amazon OpenSearch 
Serverless: AOSS does"
+                            + " not allow clients to specify document ids, so 
update-by-id cannot be"
+                            + " implemented. Use a provisioned OpenSearch 
Service domain if you need"
+                            + " in-place embedding updates.");
+        }
         // BaseVectorStore.update() already enforces that every document 
carries an id, so
         // addEmbedding will not generate new ones here.
         addEmbedding(documents, collection, extraArgs);
@@ -391,16 +421,26 @@ public class OpenSearchVectorStore extends BaseVectorStore
         if (!indexExists(idx)) {
             createKnnIndex(idx);
         }
-        List<String> allIds = new ArrayList<>();
+        List<String> clientIds = new ArrayList<>();
+        // For serverless we accumulate ids returned by AOSS across batches; 
for provisioned
+        // domains we keep returning the client-supplied/generated ids that we 
sent in _bulk.
+        List<String> aossIds = serverless ? new ArrayList<>() : null;
         StringBuilder bulk = new StringBuilder();
         int bulkBytes = 0;
 
         for (Document doc : documents) {
             String id = doc.getId() != null ? doc.getId() : 
UUID.randomUUID().toString();
-            allIds.add(id);
+            clientIds.add(id);
 
             ObjectNode action = MAPPER.createObjectNode();
-            action.putObject("index").put("_index", idx).put("_id", id);
+            ObjectNode indexAction = action.putObject("index").put("_index", 
idx);
+            // Amazon OpenSearch Serverless rejects custom _id in create/index 
operations
+            // ("Document ID is not supported in create/index operation 
request"). Auto-generated
+            // ids are mandatory on AOSS, so for serverless we omit _id here 
and harvest the
+            // AOSS-generated ids out of the _bulk response below.
+            if (!serverless) {
+                indexAction.put("_id", id);
+            }
             String actionLine = action.toString() + "\n";
 
             ObjectNode source = MAPPER.createObjectNode();
@@ -419,7 +459,11 @@ public class OpenSearchVectorStore extends BaseVectorStore
             int entryBytes = actionLine.length() + sourceLine.length();
 
             if (bulkBytes > 0 && bulkBytes + entryBytes > maxBulkBytes) {
-                executeRequest("POST", "/_bulk", bulk.toString());
+                JsonNode resp = executeRequest("POST", "/_bulk", 
bulk.toString());
+                checkBulkResponse(resp);
+                if (aossIds != null) {
+                    collectBulkIds(resp, aossIds);
+                }
                 bulk.setLength(0);
                 bulkBytes = 0;
             }
@@ -429,10 +473,14 @@ public class OpenSearchVectorStore extends BaseVectorStore
         }
 
         if (bulkBytes > 0) {
-            executeRequest("POST", "/_bulk", bulk.toString());
+            JsonNode resp = executeRequest("POST", "/_bulk", bulk.toString());
+            checkBulkResponse(resp);
+            if (aossIds != null) {
+                collectBulkIds(resp, aossIds);
+            }
         }
-        executeRequest("POST", "/" + idx + "/_refresh", null);
-        return allIds;
+        refreshIfSupported(idx);
+        return aossIds != null ? aossIds : clientIds;
     }
 
     @SuppressWarnings("unchecked")
@@ -502,6 +550,76 @@ public class OpenSearchVectorStore extends BaseVectorStore
                 () -> doExecuteRequest(method, path, body), 
"OpenSearchRequest");
     }
 
+    /** SHA-256 hex of the given bytes. Required by AOSS as 
x-amz-content-sha256. */
+    private static String sha256Hex(byte[] data) {
+        try {
+            byte[] digest = MessageDigest.getInstance("SHA-256").digest(data);
+            StringBuilder sb = new StringBuilder(digest.length * 2);
+            for (byte b : digest) {
+                sb.append(String.format("%02x", b));
+            }
+            return sb.toString();
+        } catch (Exception e) {
+            throw new RuntimeException("SHA-256 not available", e);
+        }
+    }
+
+    /**
+     * The OpenSearch _bulk API returns HTTP 200 even when individual items 
fail (e.g. AOSS
+     * rejecting custom _id). The response has {@code errors:true} when any 
item failed; surface
+     * that as an exception so callers don't get silent partial-success 
behaviour.
+     */
+    private static void checkBulkResponse(JsonNode resp) {
+        if (resp != null && resp.has("errors") && 
resp.get("errors").asBoolean()) {
+            String firstError = "unknown";
+            JsonNode items = resp.path("items");
+            if (items.isArray()) {
+                for (JsonNode it : items) {
+                    JsonNode err = it.path("index").path("error");
+                    if (!err.isMissingNode()) {
+                        firstError = err.toString();
+                        break;
+                    }
+                }
+            }
+            throw new RuntimeException("OpenSearch _bulk had errors. First: " 
+ firstError);
+        }
+    }
+
+    /**
+     * Extract the {@code _id} from each {@code items[].index} entry of a 
successful {@code _bulk}
+     * response and append to {@code out}. Used on AOSS where ids are 
server-generated and the
+     * caller of {@code add()} needs them in order to later {@code get}/{@code 
delete} the
+     * documents. Caller must invoke {@link #checkBulkResponse(JsonNode)} 
first.
+     */
+    private static void collectBulkIds(JsonNode resp, List<String> out) {
+        if (resp == null) {
+            return;
+        }
+        JsonNode items = resp.path("items");
+        if (!items.isArray()) {
+            return;
+        }
+        for (JsonNode it : items) {
+            JsonNode idNode = it.path("index").path("_id");
+            if (!idNode.isMissingNode() && !idNode.isNull()) {
+                out.add(idNode.asText());
+            }
+        }
+    }
+
+    /**
+     * Refreshes the index if the underlying service supports it. Amazon 
OpenSearch Serverless does
+     * NOT expose the {@code _refresh} API and returns 404 — for AOSS we rely 
on the service's
+     * eventual-consistency window (~1-30s) instead.
+     */
+    private void refreshIfSupported(String idx) {
+        if (serverless) {
+            return;
+        }
+        executeRequest("POST", "/" + idx + "/_refresh", null);
+    }
+
     private static boolean isRetryableStatus(Exception e) {
         String msg = e.getMessage();
         return msg != null
@@ -518,8 +636,17 @@ public class OpenSearchVectorStore extends BaseVectorStore
                             .putHeader("Content-Type", "application/json");
 
             if (body != null) {
-                reqBuilder.contentStreamProvider(
-                        () -> new 
ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)));
+                byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+                reqBuilder.contentStreamProvider(() -> new 
ByteArrayInputStream(bodyBytes));
+                // Amazon OpenSearch Serverless requires both Content-Length 
and
+                // x-amz-content-sha256 on signed write requests. The legacy 
Aws4Signer
+                // does not populate them when the body is supplied via
+                // contentStreamProvider, so we set them explicitly. 
OpenSearch Service
+                // domains accept the request with or without these headers.
+                reqBuilder.putHeader("Content-Length", 
String.valueOf(bodyBytes.length));
+                if (useIamAuth) {
+                    reqBuilder.putHeader("x-amz-content-sha256", 
sha256Hex(bodyBytes));
+                }
             }
 
             SdkHttpFullRequest request;

Reply via email to