This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 5eb9125d [integrations][java] Fix OpenSearchVectorStore for Amazon
OpenSearch Serverless (#678)
5eb9125d is described below
commit 5eb9125d684e04299cf3d407efcd9de9fd371a3b
Author: Avichay Marciano <[email protected]>
AuthorDate: Mon May 18 07:14:17 2026 +0300
[integrations][java] Fix OpenSearchVectorStore for Amazon OpenSearch
Serverless (#678)
Fixes #674
AOSS-specific differences from provisioned domains, addressed without
changing
domain behaviour:
- SigV4: add x-amz-content-sha256 and Content-Length before signing.
- _refresh: skipped (not exposed on AOSS).
- _bulk: omit custom _id on serverless and return AOSS-generated ids from
the response so add() callers get usable ids for later get/delete.
- updateEmbedding on serverless throws UnsupportedOperationException
(no client-controllable _id, so update-by-id is impossible).
- _bulk partial failures now surfaced (was silently dropping data).
- createKnnIndex pins FAISS/HNSW (default NMSLIB on AOSS rejects filters).
- Index propagation: 15s settle after create on serverless.
Verified end-to-end against a live AOSS VECTORSEARCH collection.
Co-authored-by: Avichay Marciano <[email protected]>
---
.../opensearch/OpenSearchVectorStore.java | 151 +++++++++++++++++++--
1 file changed, 139 insertions(+), 12 deletions(-)
diff --git
a/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
b/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
index 0f7cbf1a..f4b74795 100644
---
a/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
+++
b/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
@@ -46,6 +46,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
@@ -245,11 +246,18 @@ public class OpenSearchVectorStore extends BaseVectorStore
}
private void createKnnIndex(String idx) {
+ // Use the FAISS engine with HNSW: it supports both pre-filtered and
post-filtered KNN
+ // queries (the default NMSLIB engine on AOSS does NOT support filters
and rejects
+ // queries with "Engine [NMSLIB] does not support filters"), and is
the recommended
+ // engine for both AOSS VECTORSEARCH collections and OpenSearch
Service domains 2.x+.
String body =
String.format(
"{\"settings\":{\"index\":{\"knn\":true}},"
+
"\"mappings\":{\"properties\":{\"%s\":{\"type\":\"knn_vector\","
- +
"\"dimension\":%d},\"%s\":{\"type\":\"text\"},"
+ + "\"dimension\":%d,"
+ +
"\"method\":{\"engine\":\"faiss\",\"name\":\"hnsw\","
+ + "\"space_type\":\"l2\"}},"
+ + "\"%s\":{\"type\":\"text\"},"
+ "\"metadata\":{\"type\":\"object\"}}}}",
vectorField, dims, contentField);
try {
@@ -259,6 +267,16 @@ public class OpenSearchVectorStore extends BaseVectorStore
throw e;
}
}
+ if (serverless) {
+ // AOSS index creation is eventually consistent; the index returns
200 on PUT but
+ // queries against it can fail with "no such index" for ~5-30s
afterward. Give the
+ // service a short window to propagate before any read/write hits
the index.
+ try {
+ Thread.sleep(15_000L);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
/** Sanitize collection name to valid OpenSearch index name (lowercase, no
special chars). */
@@ -326,7 +344,7 @@ public class OpenSearchVectorStore extends BaseVectorStore
}
}
executeRequest("POST", "/" + idx + "/_delete_by_query",
body.toString());
- executeRequest("POST", "/" + idx + "/_refresh", null);
+ refreshIfSupported(idx);
}
@Override
@@ -377,7 +395,19 @@ public class OpenSearchVectorStore extends BaseVectorStore
public void updateEmbedding(
List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
throws IOException {
- // OpenSearch's bulk index operation is upsert-by-id, so addEmbedding
doubles as update.
+ // OpenSearch's bulk index operation is upsert-by-id, so addEmbedding
doubles as update on
+ // OpenSearch Service domains. On Amazon OpenSearch Serverless this
pattern cannot work:
+ // AOSS rejects client-supplied _id in create/index operations, so the
addEmbedding path
+ // below has to fall back to AOSS-generated ids. With no
client-controllable id there is
+ // no way to target an existing document, and "updating" would
silently insert a new copy
+ // instead, which would be worse than failing loudly.
+ if (serverless) {
+ throw new UnsupportedOperationException(
+ "updateEmbedding is not supported on Amazon OpenSearch
Serverless: AOSS does"
+ + " not allow clients to specify document ids, so
update-by-id cannot be"
+ + " implemented. Use a provisioned OpenSearch
Service domain if you need"
+ + " in-place embedding updates.");
+ }
// BaseVectorStore.update() already enforces that every document
carries an id, so
// addEmbedding will not generate new ones here.
addEmbedding(documents, collection, extraArgs);
@@ -391,16 +421,26 @@ public class OpenSearchVectorStore extends BaseVectorStore
if (!indexExists(idx)) {
createKnnIndex(idx);
}
- List<String> allIds = new ArrayList<>();
+ List<String> clientIds = new ArrayList<>();
+ // For serverless we accumulate ids returned by AOSS across batches;
for provisioned
+ // domains we keep returning the client-supplied/generated ids that we
sent in _bulk.
+ List<String> aossIds = serverless ? new ArrayList<>() : null;
StringBuilder bulk = new StringBuilder();
int bulkBytes = 0;
for (Document doc : documents) {
String id = doc.getId() != null ? doc.getId() :
UUID.randomUUID().toString();
- allIds.add(id);
+ clientIds.add(id);
ObjectNode action = MAPPER.createObjectNode();
- action.putObject("index").put("_index", idx).put("_id", id);
+ ObjectNode indexAction = action.putObject("index").put("_index",
idx);
+ // Amazon OpenSearch Serverless rejects custom _id in create/index
operations
+ // ("Document ID is not supported in create/index operation
request"). Auto-generated
+ // ids are mandatory on AOSS, so for serverless we omit _id here
and harvest the
+ // AOSS-generated ids out of the _bulk response below.
+ if (!serverless) {
+ indexAction.put("_id", id);
+ }
String actionLine = action.toString() + "\n";
ObjectNode source = MAPPER.createObjectNode();
@@ -419,7 +459,11 @@ public class OpenSearchVectorStore extends BaseVectorStore
int entryBytes = actionLine.length() + sourceLine.length();
if (bulkBytes > 0 && bulkBytes + entryBytes > maxBulkBytes) {
- executeRequest("POST", "/_bulk", bulk.toString());
+ JsonNode resp = executeRequest("POST", "/_bulk",
bulk.toString());
+ checkBulkResponse(resp);
+ if (aossIds != null) {
+ collectBulkIds(resp, aossIds);
+ }
bulk.setLength(0);
bulkBytes = 0;
}
@@ -429,10 +473,14 @@ public class OpenSearchVectorStore extends BaseVectorStore
}
if (bulkBytes > 0) {
- executeRequest("POST", "/_bulk", bulk.toString());
+ JsonNode resp = executeRequest("POST", "/_bulk", bulk.toString());
+ checkBulkResponse(resp);
+ if (aossIds != null) {
+ collectBulkIds(resp, aossIds);
+ }
}
- executeRequest("POST", "/" + idx + "/_refresh", null);
- return allIds;
+ refreshIfSupported(idx);
+ return aossIds != null ? aossIds : clientIds;
}
@SuppressWarnings("unchecked")
@@ -502,6 +550,76 @@ public class OpenSearchVectorStore extends BaseVectorStore
() -> doExecuteRequest(method, path, body),
"OpenSearchRequest");
}
+ /** SHA-256 hex of the given bytes. Required by AOSS as
x-amz-content-sha256. */
+ private static String sha256Hex(byte[] data) {
+ try {
+ byte[] digest = MessageDigest.getInstance("SHA-256").digest(data);
+ StringBuilder sb = new StringBuilder(digest.length * 2);
+ for (byte b : digest) {
+ sb.append(String.format("%02x", b));
+ }
+ return sb.toString();
+ } catch (Exception e) {
+ throw new RuntimeException("SHA-256 not available", e);
+ }
+ }
+
+ /**
+ * The OpenSearch _bulk API returns HTTP 200 even when individual items
fail (e.g. AOSS
+ * rejecting custom _id). The response has {@code errors:true} when any
item failed; surface
+ * that as an exception so callers don't get silent partial-success
behaviour.
+ */
+ private static void checkBulkResponse(JsonNode resp) {
+ if (resp != null && resp.has("errors") &&
resp.get("errors").asBoolean()) {
+ String firstError = "unknown";
+ JsonNode items = resp.path("items");
+ if (items.isArray()) {
+ for (JsonNode it : items) {
+ JsonNode err = it.path("index").path("error");
+ if (!err.isMissingNode()) {
+ firstError = err.toString();
+ break;
+ }
+ }
+ }
+ throw new RuntimeException("OpenSearch _bulk had errors. First: "
+ firstError);
+ }
+ }
+
+ /**
+ * Extract the {@code _id} from each {@code items[].index} entry of a
successful {@code _bulk}
+ * response and append to {@code out}. Used on AOSS where ids are
server-generated and the
+ * caller of {@code add()} needs them in order to later {@code get}/{@code
delete} the
+ * documents. Caller must invoke {@link #checkBulkResponse(JsonNode)}
first.
+ */
+ private static void collectBulkIds(JsonNode resp, List<String> out) {
+ if (resp == null) {
+ return;
+ }
+ JsonNode items = resp.path("items");
+ if (!items.isArray()) {
+ return;
+ }
+ for (JsonNode it : items) {
+ JsonNode idNode = it.path("index").path("_id");
+ if (!idNode.isMissingNode() && !idNode.isNull()) {
+ out.add(idNode.asText());
+ }
+ }
+ }
+
+ /**
+ * Refreshes the index if the underlying service supports it. Amazon
OpenSearch Serverless does
+ * NOT expose the {@code _refresh} API and returns 404 — for AOSS we rely
on the service's
+ * eventual-consistency window (~1-30s) instead.
+ */
+ private void refreshIfSupported(String idx) {
+ if (serverless) {
+ return;
+ }
+ executeRequest("POST", "/" + idx + "/_refresh", null);
+ }
+
private static boolean isRetryableStatus(Exception e) {
String msg = e.getMessage();
return msg != null
@@ -518,8 +636,17 @@ public class OpenSearchVectorStore extends BaseVectorStore
.putHeader("Content-Type", "application/json");
if (body != null) {
- reqBuilder.contentStreamProvider(
- () -> new
ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)));
+ byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ reqBuilder.contentStreamProvider(() -> new
ByteArrayInputStream(bodyBytes));
+ // Amazon OpenSearch Serverless requires both Content-Length
and
+ // x-amz-content-sha256 on signed write requests. The legacy
Aws4Signer
+ // does not populate them when the body is supplied via
+ // contentStreamProvider, so we set them explicitly.
OpenSearch Service
+ // domains accept the request with or without these headers.
+ reqBuilder.putHeader("Content-Length",
String.valueOf(bodyBytes.length));
+ if (useIamAuth) {
+ reqBuilder.putHeader("x-amz-content-sha256",
sha256Hex(bodyBytes));
+ }
}
SdkHttpFullRequest request;