This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dad0287521a fix: handle ResponseException correctly, honor
throwWriteExceptions when using RetryConfiguration (#37166)
dad0287521a is described below
commit dad0287521a9337cc466b58af6482b51741a6eef
Author: Egbert van der Wal <[email protected]>
AuthorDate: Fri Dec 26 21:07:25 2025 +0100
fix: handle ResponseException correctly, honor throwWriteExceptions when
using RetryConfiguration (#37166)
---
.../sdk/io/elasticsearch/ElasticsearchIOTest.java | 6 +++
.../sdk/io/elasticsearch/ElasticsearchIOTest.java | 6 +++
.../elasticsearch/ElasticsearchIOTestCommon.java | 49 ++++++++++++++++++++++
.../io/elasticsearch/ElasticsearchIOTestUtils.java | 42 +++++++++++++++++++
.../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 30 ++++++++++---
5 files changed, 127 insertions(+), 6 deletions(-)
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 08bf6e3a298..1e4531202ec 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -316,4 +316,10 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
}
+
+ @Test
+ public void testWriteWithClientResponseExceptionIsRetried() throws Exception
{
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+
elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried();
+ }
}
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 5f3861e69a9..9933f5d1cdc 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -316,4 +316,10 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
}
+
+ @Test
+ public void testWriteWithClientResponseExceptionIsRetried() throws Exception
{
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+
elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried();
+ }
}
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 768b3cfe803..2c911b2014c 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -506,6 +506,55 @@ class ElasticsearchIOTestCommon {
pipeline.run();
}
+ void testWriteWithElasticClientResponseExceptionIsRetried() throws Exception
{
+ try (ElasticsearchIOTestUtils.AlwaysFailServer srv =
+ new ElasticsearchIOTestUtils.AlwaysFailServer(0, 500)) {
+ int port = srv.getPort();
+ String[] hosts = {String.format("http://localhost:%d", port)};
+ ConnectionConfiguration clientConfig =
ConnectionConfiguration.create(hosts);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(clientConfig)
+ .withBackendVersion(8) // Mock server does not return proper
version
+ .withMaxBatchSize(numDocs + 1)
+ .withMaxBatchSizeBytes(
+ Long.MAX_VALUE) // Max long number to make sure all docs are
flushed in one batch.
+ .withThrowWriteErrors(false)
+ .withRetryConfiguration(
+ ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS,
Duration.millis(35000))
+ .withRetryPredicate(CUSTOM_RETRY_PREDICATE))
+ .withIdFn(new ExtractValueFn("id"))
+ .withUseStatefulBatches(true);
+
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(1,
InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ PCollectionTuple outputs = pipeline.apply(Create.of(data)).apply(write);
+
+ // The whole batch should fail and direct to tag FAILED_WRITES because
of one invalid doc.
+ PCollection<String> success =
+ outputs
+ .get(Write.SUCCESSFUL_WRITES)
+ .apply("Convert success to input ID",
MapElements.via(mapToInputIdString));
+
+ PCollection<String> fail =
+ outputs
+ .get(Write.FAILED_WRITES)
+ .apply("Convert fails to input ID",
MapElements.via(mapToInputIdString));
+
+ PAssert.that(success).empty();
+ PAssert.that(fail).containsInAnyOrder("0"); // First and only document
+
+ // Verify response item contains the corresponding error message.
+ String expectedError =
+ String.format(ElasticsearchIO.BulkIO.RETRY_FAILED_LOG,
EXPECTED_RETRIES);
+ PAssert.that(outputs.get(Write.FAILED_WRITES))
+ .satisfies(responseItemJsonSubstringValidator(expectedError));
+ pipeline.run();
+ }
+ }
+
void testWriteWithAllowedErrors() throws Exception {
Set<String> allowedErrors = new HashSet<>();
allowedErrors.add("json_parse_exception");
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
index 7e3cd58fd20..102dfffdb0f 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
@@ -27,7 +27,12 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
@@ -555,4 +560,41 @@ class ElasticsearchIOTestUtils {
}
}
};
+
+ /**
+ * Small server that always returns a specified HTTP error code. This is
useful to simulate server
+ * errors in tests.
+ */
+ static class AlwaysFailServer implements AutoCloseable {
+ private final HttpServer server;
+ private final int port;
+
+ AlwaysFailServer(int port, int status) throws IOException {
+ HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
+ this.port = server.getAddress().getPort();
+ server.createContext("/", exchange -> handle(exchange, status));
+ server.start();
+
+ this.server = server;
+ }
+
+ int getPort() {
+ return port;
+ }
+
+ private static void handle(HttpExchange exchange, int status) throws
IOException {
+ byte[] response = "Internal Server
Error".getBytes(StandardCharsets.UTF_8);
+ exchange.sendResponseHeaders(status, response.length);
+ try (OutputStream os = exchange.getResponseBody()) {
+ os.write(response);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (server != null) {
+ server.stop(0);
+ }
+ }
+ }
}
diff --git
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index c634bb99e02..ba4ac276994 100644
---
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -2811,14 +2811,23 @@ public class ElasticsearchIO {
}
private boolean isRetryableClientException(Throwable t) {
- // RestClient#performRequest only throws wrapped IOException so we
must inspect the
+ // RestClient#performRequest mainly throws wrapped IOException so we
must inspect the
// exception cause to determine if the exception is likely transient
i.e. retryable or
- // not.
+ // not. One exception is the ResponseException that is thrown when
attempting to parse the
+ // response. This exception is not wrapped.
+
+ // ResponseException should not be wrapped, but check the cause to be
safe for future
+ // changes
+ ResponseException re = null;
+ if (t instanceof ResponseException) {
+ re = (ResponseException) t;
+ } else if (t.getCause() instanceof ResponseException) {
+ re = (ResponseException) t.getCause();
+ }
// Retry for 500-range response code except for 501.
- if (t.getCause() instanceof ResponseException) {
- ResponseException ex = (ResponseException) t.getCause();
- int statusCode = ex.getResponse().getStatusLine().getStatusCode();
+ if (re != null) {
+ int statusCode = re.getResponse().getStatusLine().getStatusCode();
return statusCode >= 500 && statusCode != 501;
}
return t.getCause() instanceof ConnectTimeoutException
@@ -2893,7 +2902,16 @@ public class ElasticsearchIO {
&&
spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
LOG.warn("ES Cluster is responding with HTP 429 -
TOO_MANY_REQUESTS.");
}
- responseEntity = handleRetry("POST", endPoint,
Collections.emptyMap(), requestBody);
+ try {
+ responseEntity = handleRetry("POST", endPoint,
Collections.emptyMap(), requestBody);
+ } catch (java.io.IOException ex) {
+ // No more retry attempts, determine what to do using
throwWriteErrors
+ if (spec.getThrowWriteErrors()) {
+ throw ex;
+ } else {
+ elasticResponseExceptionMessage = ex.getMessage();
+ }
+ }
}
List<Document> responses;