This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 02ef38eb7 [Improve][Connector-V2][ElasticSearch] Improve es bulk sink
retriable mechanism (#3148)
02ef38eb7 is described below
commit 02ef38eb7adf75e28ffb727296b4db90e78a684d
Author: Harvey Yue <[email protected]>
AuthorDate: Mon Nov 21 21:16:58 2022 +0800
[Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable
mechanism (#3148)
---
.../apache/seatunnel/common/utils/RetryUtils.java | 41 ++++++++++++++++++--
.../sink/ElasticsearchSinkWriter.java | 45 +++++++++++-----------
2 files changed, 61 insertions(+), 25 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
index fe0f3985c..0d2132397 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
@@ -17,6 +17,11 @@
package org.apache.seatunnel.common.utils;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
public class RetryUtils {
/**
@@ -46,11 +51,19 @@ public class RetryUtils {
if (retryMaterial.shouldThrowException()) {
throw e;
}
- } else if (retryMaterial.getSleepTimeMillis() > 0) {
- Thread.sleep(retryMaterial.getSleepTimeMillis());
+ } else {
+ // Otherwise it is retriable and we should retry
+ String attemptMessage = "Failed to execute due to {}.
Retrying attempt ({}/{}) after backoff of {} ms";
+ if (retryMaterial.getSleepTimeMillis() > 0) {
+ long backoff =
retryMaterial.computeRetryWaitTimeMillis(i);
+ log.warn(attemptMessage, e.getCause(), i, retryTimes,
backoff);
+ Thread.sleep(backoff);
+ } else {
+ log.warn(attemptMessage, e.getCause(), i, retryTimes,
0);
+ }
}
}
- } while (i <= retryTimes);
+ } while (i < retryTimes);
if (retryMaterial.shouldThrowException()) {
throw new RuntimeException("Execute given execution failed after
retry " + retryTimes + " times", lastException);
}
@@ -58,6 +71,16 @@ public class RetryUtils {
}
public static class RetryMaterial {
+ /**
+ * An arbitrary absolute maximum practical retry time.
+ */
+ public static final long MAX_RETRY_TIME_MS =
TimeUnit.MINUTES.toMillis(1);
+
+ /**
+ * The maximum retry time.
+ */
+ public static final long MAX_RETRY_TIME = 32;
+
/**
* Retry times, if you set it to 1, the given execution will be
executed twice.
* Should be greater than 0.
@@ -102,6 +125,18 @@ public class RetryUtils {
public long getSleepTimeMillis() {
return sleepTimeMillis;
}
+
+ public long computeRetryWaitTimeMillis(int retryAttempts) {
+ if (sleepTimeMillis < 0) {
+ return 0;
+ }
+ if (retryAttempts > MAX_RETRY_TIME) {
+ // This would overflow the exponential algorithm ...
+ return MAX_RETRY_TIME_MS;
+ }
+ long result = sleepTimeMillis << retryAttempts;
+ return result < 0L ? MAX_RETRY_TIME_MS :
Math.min(MAX_RETRY_TIME_MS, result);
+ }
}
@FunctionalInterface
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 682afec58..251905c77 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -20,6 +20,9 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.RetryUtils.RetryMaterial;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
@@ -49,21 +52,21 @@ public class ElasticsearchSinkWriter implements
SinkWriter<SeaTunnelRow, Elastic
private final int maxBatchSize;
- private final int maxRetryCount;
-
private final SeaTunnelRowSerializer seaTunnelRowSerializer;
private final List<String> requestEsList;
private EsRestClient esRestClient;
+ private RetryMaterial retryMaterial;
+ private static final long DEFAULT_SLEEP_TIME_MS = 200L;
public ElasticsearchSinkWriter(
SinkWriter.Context context,
SeaTunnelRowType seaTunnelRowType,
Config pluginConfig,
- int maxBatchSize, int maxRetryCount,
+ int maxBatchSize,
+ int maxRetryCount,
List<ElasticsearchSinkState> elasticsearchStates) {
this.context = context;
this.maxBatchSize = maxBatchSize;
- this.maxRetryCount = maxRetryCount;
IndexInfo indexInfo = new IndexInfo(pluginConfig);
esRestClient = EsRestClient.createInstance(pluginConfig);
@@ -71,6 +74,8 @@ public class ElasticsearchSinkWriter implements
SinkWriter<SeaTunnelRow, Elastic
this.seaTunnelRowSerializer = new
ElasticsearchRowSerializer(elasticsearchVersion, indexInfo, seaTunnelRowType);
this.requestEsList = new ArrayList<>(maxBatchSize);
+ this.retryMaterial = new RetryMaterial(maxRetryCount, true,
+ exception -> true, DEFAULT_SLEEP_TIME_MS);
}
@Override
@@ -78,7 +83,7 @@ public class ElasticsearchSinkWriter implements
SinkWriter<SeaTunnelRow, Elastic
String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
requestEsList.add(indexRequestRow);
if (requestEsList.size() >= maxBatchSize) {
- bulkEsWithRetry(this.esRestClient, this.requestEsList,
maxRetryCount);
+ bulkEsWithRetry(this.esRestClient, this.requestEsList);
requestEsList.clear();
}
}
@@ -92,31 +97,27 @@ public class ElasticsearchSinkWriter implements
SinkWriter<SeaTunnelRow, Elastic
public void abortPrepare() {
}
- public void bulkEsWithRetry(EsRestClient esRestClient, List<String>
requestEsList, int maxRetry) {
- for (int tryCnt = 1; tryCnt <= maxRetry; tryCnt++) {
- if (requestEsList.size() > 0) {
- String requestBody = String.join("\n", requestEsList) + "\n";
- try {
+ public void bulkEsWithRetry(EsRestClient esRestClient, List<String>
requestEsList) {
+ try {
+ RetryUtils.retryWithException(() -> {
+ if (requestEsList.size() > 0) {
+ String requestBody = String.join("\n", requestEsList) +
"\n";
BulkResponse bulkResponse = esRestClient.bulk(requestBody);
- if (!bulkResponse.isErrors()) {
- break;
- } else {
- throw new
BulkElasticsearchException(bulkResponse.getResponse());
- }
- } catch (Exception ex) {
- if (tryCnt == maxRetry) {
- throw new BulkElasticsearchException("bulk
elasticsearch error,try count=%d", ex);
+ if (bulkResponse.isErrors()) {
+ throw new BulkElasticsearchException("bulk es error: "
+ bulkResponse.getResponse());
}
- log.warn(String.format("bulk elasticsearch error,try
count=%d", tryCnt), ex);
+ return bulkResponse;
}
-
- }
+ return null;
+ }, retryMaterial);
+ } catch (Exception e) {
+ throw new SeaTunnelException("ElasticSearch execute batch
statement error", e);
}
}
@Override
public void close() throws IOException {
- bulkEsWithRetry(this.esRestClient, this.requestEsList, maxRetryCount);
+ bulkEsWithRetry(this.esRestClient, this.requestEsList);
esRestClient.close();
}
}