This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 02ef38eb7 [Improve][Connector-V2][ElasticSearch] Improve es bulk sink 
retriable mechanism (#3148)
02ef38eb7 is described below

commit 02ef38eb7adf75e28ffb727296b4db90e78a684d
Author: Harvey Yue <[email protected]>
AuthorDate: Mon Nov 21 21:16:58 2022 +0800

    [Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable 
mechanism (#3148)
---
 .../apache/seatunnel/common/utils/RetryUtils.java  | 41 ++++++++++++++++++--
 .../sink/ElasticsearchSinkWriter.java              | 45 +++++++++++-----------
 2 files changed, 61 insertions(+), 25 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
index fe0f3985c..0d2132397 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.common.utils;
 
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
 public class RetryUtils {
 
     /**
@@ -46,11 +51,19 @@ public class RetryUtils {
                     if (retryMaterial.shouldThrowException()) {
                         throw e;
                     }
-                } else if (retryMaterial.getSleepTimeMillis() > 0) {
-                    Thread.sleep(retryMaterial.getSleepTimeMillis());
+                } else {
+                    // Otherwise it is retriable and we should retry
+                    String attemptMessage = "Failed to execute due to {}. 
Retrying attempt ({}/{}) after backoff of {} ms";
+                    if (retryMaterial.getSleepTimeMillis() > 0) {
+                        long backoff = 
retryMaterial.computeRetryWaitTimeMillis(i);
+                        log.warn(attemptMessage, e.getCause(), i, retryTimes, 
backoff);
+                        Thread.sleep(backoff);
+                    } else {
+                        log.warn(attemptMessage, e.getCause(), i, retryTimes, 
0);
+                    }
                 }
             }
-        } while (i <= retryTimes);
+        } while (i < retryTimes);
         if (retryMaterial.shouldThrowException()) {
             throw new RuntimeException("Execute given execution failed after 
retry " + retryTimes + " times", lastException);
         }
@@ -58,6 +71,16 @@ public class RetryUtils {
     }
 
     public static class RetryMaterial {
+        /**
+         * An arbitrary absolute maximum practical retry time.
+         */
+        public static final long MAX_RETRY_TIME_MS = 
TimeUnit.MINUTES.toMillis(1);
+
+        /**
+         * The maximum retry time.
+         */
+        public static final long MAX_RETRY_TIME = 32;
+
         /**
          * Retry times, if you set it to 1, the given execution will be 
executed twice.
          * Should be greater than 0.
@@ -102,6 +125,18 @@ public class RetryUtils {
         public long getSleepTimeMillis() {
             return sleepTimeMillis;
         }
+
+        public long computeRetryWaitTimeMillis(int retryAttempts) {
+            if (sleepTimeMillis < 0) {
+                return 0;
+            }
+            if (retryAttempts > MAX_RETRY_TIME) {
+                // This would overflow the exponential algorithm ...
+                return MAX_RETRY_TIME_MS;
+            }
+            long result = sleepTimeMillis << retryAttempts;
+            return result < 0L ? MAX_RETRY_TIME_MS : 
Math.min(MAX_RETRY_TIME_MS, result);
+        }
     }
 
     @FunctionalInterface
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 682afec58..251905c77 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -20,6 +20,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.RetryUtils.RetryMaterial;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
@@ -49,21 +52,21 @@ public class ElasticsearchSinkWriter implements 
SinkWriter<SeaTunnelRow, Elastic
 
     private final int maxBatchSize;
 
-    private final int maxRetryCount;
-
     private final SeaTunnelRowSerializer seaTunnelRowSerializer;
     private final List<String> requestEsList;
     private EsRestClient esRestClient;
+    private RetryMaterial retryMaterial;
+    private static final long DEFAULT_SLEEP_TIME_MS = 200L;
 
     public ElasticsearchSinkWriter(
         SinkWriter.Context context,
         SeaTunnelRowType seaTunnelRowType,
         Config pluginConfig,
-        int maxBatchSize, int maxRetryCount,
+        int maxBatchSize,
+        int maxRetryCount,
         List<ElasticsearchSinkState> elasticsearchStates) {
         this.context = context;
         this.maxBatchSize = maxBatchSize;
-        this.maxRetryCount = maxRetryCount;
 
         IndexInfo indexInfo = new IndexInfo(pluginConfig);
         esRestClient = EsRestClient.createInstance(pluginConfig);
@@ -71,6 +74,8 @@ public class ElasticsearchSinkWriter implements 
SinkWriter<SeaTunnelRow, Elastic
         this.seaTunnelRowSerializer = new 
ElasticsearchRowSerializer(elasticsearchVersion, indexInfo, seaTunnelRowType);
 
         this.requestEsList = new ArrayList<>(maxBatchSize);
+        this.retryMaterial = new RetryMaterial(maxRetryCount, true,
+            exception -> true, DEFAULT_SLEEP_TIME_MS);
     }
 
     @Override
@@ -78,7 +83,7 @@ public class ElasticsearchSinkWriter implements 
SinkWriter<SeaTunnelRow, Elastic
         String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
         requestEsList.add(indexRequestRow);
         if (requestEsList.size() >= maxBatchSize) {
-            bulkEsWithRetry(this.esRestClient, this.requestEsList, 
maxRetryCount);
+            bulkEsWithRetry(this.esRestClient, this.requestEsList);
             requestEsList.clear();
         }
     }
@@ -92,31 +97,27 @@ public class ElasticsearchSinkWriter implements 
SinkWriter<SeaTunnelRow, Elastic
     public void abortPrepare() {
     }
 
-    public void bulkEsWithRetry(EsRestClient esRestClient, List<String> 
requestEsList, int maxRetry) {
-        for (int tryCnt = 1; tryCnt <= maxRetry; tryCnt++) {
-            if (requestEsList.size() > 0) {
-                String requestBody = String.join("\n", requestEsList) + "\n";
-                try {
+    public void bulkEsWithRetry(EsRestClient esRestClient, List<String> 
requestEsList) {
+        try {
+            RetryUtils.retryWithException(() -> {
+                if (requestEsList.size() > 0) {
+                    String requestBody = String.join("\n", requestEsList) + 
"\n";
                     BulkResponse bulkResponse = esRestClient.bulk(requestBody);
-                    if (!bulkResponse.isErrors()) {
-                        break;
-                    } else {
-                        throw new 
BulkElasticsearchException(bulkResponse.getResponse());
-                    }
-                } catch (Exception ex) {
-                    if (tryCnt == maxRetry) {
-                        throw new BulkElasticsearchException("bulk 
elasticsearch error,try count=%d", ex);
+                    if (bulkResponse.isErrors()) {
+                        throw new BulkElasticsearchException("bulk es error: " 
+ bulkResponse.getResponse());
                     }
-                    log.warn(String.format("bulk elasticsearch error,try 
count=%d", tryCnt), ex);
+                    return bulkResponse;
                 }
-
-            }
+                return null;
+            }, retryMaterial);
+        } catch (Exception e) {
+            throw new SeaTunnelException("ElasticSearch execute batch 
statement error", e);
         }
     }
 
     @Override
     public void close() throws IOException {
-        bulkEsWithRetry(this.esRestClient, this.requestEsList, maxRetryCount);
+        bulkEsWithRetry(this.esRestClient, this.requestEsList);
         esRestClient.close();
     }
 }

Reply via email to