This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dad0287521a fix: handle ResponseException correctly, honor 
throwWriteExceptions when using RetryConfiguration (#37166)
dad0287521a is described below

commit dad0287521a9337cc466b58af6482b51741a6eef
Author: Egbert van der Wal <[email protected]>
AuthorDate: Fri Dec 26 21:07:25 2025 +0100

    fix: handle ResponseException correctly, honor throwWriteExceptions when 
using RetryConfiguration (#37166)
---
 .../sdk/io/elasticsearch/ElasticsearchIOTest.java  |  6 +++
 .../sdk/io/elasticsearch/ElasticsearchIOTest.java  |  6 +++
 .../elasticsearch/ElasticsearchIOTestCommon.java   | 49 ++++++++++++++++++++++
 .../io/elasticsearch/ElasticsearchIOTestUtils.java | 42 +++++++++++++++++++
 .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 30 ++++++++++---
 5 files changed, 127 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 08bf6e3a298..1e4531202ec 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -316,4 +316,10 @@ public class ElasticsearchIOTest implements Serializable {
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
   }
+
+  @Test
+  public void testWriteWithClientResponseExceptionIsRetried() throws Exception 
{
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    
elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried();
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 5f3861e69a9..9933f5d1cdc 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -316,4 +316,10 @@ public class ElasticsearchIOTest implements Serializable {
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
   }
+
+  @Test
+  public void testWriteWithClientResponseExceptionIsRetried() throws Exception 
{
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    
elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried();
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 768b3cfe803..2c911b2014c 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -506,6 +506,55 @@ class ElasticsearchIOTestCommon {
     pipeline.run();
   }
 
+  void testWriteWithElasticClientResponseExceptionIsRetried() throws Exception 
{
+    try (ElasticsearchIOTestUtils.AlwaysFailServer srv =
+        new ElasticsearchIOTestUtils.AlwaysFailServer(0, 500)) {
+      int port = srv.getPort();
+      String[] hosts = {String.format("http://localhost:%d";, port)};
+      ConnectionConfiguration clientConfig = 
ConnectionConfiguration.create(hosts);
+
+      Write write =
+          ElasticsearchIO.write()
+              .withConnectionConfiguration(clientConfig)
+              .withBackendVersion(8) // Mock server does not return proper 
version
+              .withMaxBatchSize(numDocs + 1)
+              .withMaxBatchSizeBytes(
+                  Long.MAX_VALUE) // Max long number to make sure all docs are 
flushed in one batch.
+              .withThrowWriteErrors(false)
+              .withRetryConfiguration(
+                  ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, 
Duration.millis(35000))
+                      .withRetryPredicate(CUSTOM_RETRY_PREDICATE))
+              .withIdFn(new ExtractValueFn("id"))
+              .withUseStatefulBatches(true);
+
+      List<String> data =
+          ElasticsearchIOTestUtils.createDocuments(1, 
InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+      PCollectionTuple outputs = pipeline.apply(Create.of(data)).apply(write);
+
+      // The whole batch should fail and direct to tag FAILED_WRITES because 
of one invalid doc.
+      PCollection<String> success =
+          outputs
+              .get(Write.SUCCESSFUL_WRITES)
+              .apply("Convert success to input ID", 
MapElements.via(mapToInputIdString));
+
+      PCollection<String> fail =
+          outputs
+              .get(Write.FAILED_WRITES)
+              .apply("Convert fails to input ID", 
MapElements.via(mapToInputIdString));
+
+      PAssert.that(success).empty();
+      PAssert.that(fail).containsInAnyOrder("0"); // First and only document
+
+      // Verify response item contains the corresponding error message.
+      String expectedError =
+          String.format(ElasticsearchIO.BulkIO.RETRY_FAILED_LOG, 
EXPECTED_RETRIES);
+      PAssert.that(outputs.get(Write.FAILED_WRITES))
+          .satisfies(responseItemJsonSubstringValidator(expectedError));
+      pipeline.run();
+    }
+  }
+
   void testWriteWithAllowedErrors() throws Exception {
     Set<String> allowedErrors = new HashSet<>();
     allowedErrors.add("json_parse_exception");
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
index 7e3cd58fd20..102dfffdb0f 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
@@ -27,7 +27,12 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpServer;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
@@ -555,4 +560,41 @@ class ElasticsearchIOTestUtils {
           }
         }
       };
+
+  /**
+   * Small server that always returns a specified HTTP error code. This is 
useful to simulate server
+   * errors in tests.
+   */
+  static class AlwaysFailServer implements AutoCloseable {
+    private final HttpServer server;
+    private final int port;
+
+    AlwaysFailServer(int port, int status) throws IOException {
+      HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
+      this.port = server.getAddress().getPort();
+      server.createContext("/", exchange -> handle(exchange, status));
+      server.start();
+
+      this.server = server;
+    }
+
+    int getPort() {
+      return port;
+    }
+
+    private static void handle(HttpExchange exchange, int status) throws 
IOException {
+      byte[] response = "Internal Server 
Error".getBytes(StandardCharsets.UTF_8);
+      exchange.sendResponseHeaders(status, response.length);
+      try (OutputStream os = exchange.getResponseBody()) {
+        os.write(response);
+      }
+    }
+
+    @Override
+    public void close() throws Exception {
+      if (server != null) {
+        server.stop(0);
+      }
+    }
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index c634bb99e02..ba4ac276994 100644
--- 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -2811,14 +2811,23 @@ public class ElasticsearchIO {
       }
 
       private boolean isRetryableClientException(Throwable t) {
-        // RestClient#performRequest only throws wrapped IOException so we 
must inspect the
+        // RestClient#performRequest mainly throws wrapped IOException so we 
must inspect the
         // exception cause to determine if the exception is likely transient 
i.e. retryable or
-        // not.
+        // not. One exception is the ResponseException that is thrown when 
attempting to parse the
+        // response. This exception is not wrapped.
+
+        // ResponseException should not be wrapped, but check the cause to be 
safe for future
+        // changes
+        ResponseException re = null;
+        if (t instanceof ResponseException) {
+          re = (ResponseException) t;
+        } else if (t.getCause() instanceof ResponseException) {
+          re = (ResponseException) t.getCause();
+        }
 
         // Retry for 500-range response code except for 501.
-        if (t.getCause() instanceof ResponseException) {
-          ResponseException ex = (ResponseException) t.getCause();
-          int statusCode = ex.getResponse().getStatusLine().getStatusCode();
+        if (re != null) {
+          int statusCode = re.getResponse().getStatusLine().getStatusCode();
           return statusCode >= 500 && statusCode != 501;
         }
         return t.getCause() instanceof ConnectTimeoutException
@@ -2893,7 +2902,16 @@ public class ElasticsearchIO {
               && 
spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
             LOG.warn("ES Cluster is responding with HTP 429 - 
TOO_MANY_REQUESTS.");
           }
-          responseEntity = handleRetry("POST", endPoint, 
Collections.emptyMap(), requestBody);
+          try {
+            responseEntity = handleRetry("POST", endPoint, 
Collections.emptyMap(), requestBody);
+          } catch (java.io.IOException ex) {
+            // No more retry attempts, determine what to do using 
throwWriteErrors
+            if (spec.getThrowWriteErrors()) {
+              throw ex;
+            } else {
+              elasticResponseExceptionMessage = ex.getMessage();
+            }
+          }
         }
 
         List<Document> responses;

Reply via email to