This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch connection-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/connection-retry by this push:
new 4a2f32edd8b fix
4a2f32edd8b is described below
commit 4a2f32edd8b092a8a934968efb96a979f8364ed1
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 16:25:37 2026 +0800
fix
---
.../apache/iotdb/commons/conf/CommonConfig.java | 23 +++++++++++++---------
.../task/subtask/PipeAbstractSinkSubtask.java | 3 ++-
.../iotdb/commons/pipe/config/PipeConfig.java | 2 +-
.../iotdb/commons/pipe/config/PipeDescriptor.java | 8 ++++----
4 files changed, 21 insertions(+), 15 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 73673bb3136..8ed5b73bcd9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -270,7 +270,8 @@ public class CommonConfig {
private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
- private long pipeConnectorRetryIntervalMs = 1000L;
+ private long pipeSinkRetryIntervalMs = 1000L;
+ private boolean pipeSinkRetryLocallyForConnectionError = true;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
@@ -1105,7 +1106,7 @@ public class CommonConfig {
return isPipeConnectorReadFileBufferMemoryControlEnabled;
}
- public void setIsPipeConnectorReadFileBufferMemoryControlEnabled(
+ public void setIsPipeSinkReadFileBufferMemoryControlEnabled(
boolean isPipeConnectorReadFileBufferMemoryControlEnabled) {
if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
== isPipeConnectorReadFileBufferMemoryControlEnabled) {
@@ -1118,7 +1119,7 @@ public class CommonConfig {
isPipeConnectorReadFileBufferMemoryControlEnabled);
}
- public void setPipeConnectorRPCThriftCompressionEnabled(
+ public void setPipeSinkRPCThriftCompressionEnabled(
boolean pipeConnectorRPCThriftCompressionEnabled) {
if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
== pipeConnectorRPCThriftCompressionEnabled) {
@@ -1355,16 +1356,20 @@ public class CommonConfig {
logger.info("pipeAutoRestartEnabled is set to {}.",
pipeAutoRestartEnabled);
}
- public long getPipeConnectorRetryIntervalMs() {
- return pipeConnectorRetryIntervalMs;
+ public long getPipeSinkRetryIntervalMs() {
+ return pipeSinkRetryIntervalMs;
}
- public void setPipeConnectorRetryIntervalMs(long
pipeConnectorRetryIntervalMs) {
- if (this.pipeConnectorRetryIntervalMs == pipeConnectorRetryIntervalMs) {
+ public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
+ if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) {
return;
}
- this.pipeConnectorRetryIntervalMs = pipeConnectorRetryIntervalMs;
- logger.info("pipeConnectorRetryIntervalMs is set to {}",
pipeConnectorRetryIntervalMs);
+ this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs;
+ logger.info("pipeSinkRetryIntervalMs is set to {}",
pipeConnectorRetryIntervalMs);
+ }
+
+ public boolean isPipeSinkRetryLocallyForConnectionError() {
+ return pipeSinkRetryLocallyForConnectionError;
}
public int
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index af75e6424f7..c57a1fa9b33 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -177,7 +177,8 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
e);
try {
sleepIfNoHighPriorityTask(
- retry *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+ Math.min(retry, MAX_RETRY_TIMES)
+ *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
} catch (final InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to handshake with the
target system.",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 1ed1e39911f..ca43b58e368 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -184,7 +184,7 @@ public class PipeConfig {
}
public long getPipeConnectorRetryIntervalMs() {
- return COMMON_CONFIG.getPipeConnectorRetryIntervalMs();
+ return COMMON_CONFIG.getPipeSinkRetryIntervalMs();
}
public boolean isPipeConnectorRPCThriftCompressionEnabled() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 2aeab95972d..8a18650b0bb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -348,7 +348,7 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_connector_read_file_buffer_size",
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
- config.setIsPipeConnectorReadFileBufferMemoryControlEnabled(
+ config.setIsPipeSinkReadFileBufferMemoryControlEnabled(
Boolean.parseBoolean(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_memory_control"))
.orElse(
@@ -356,14 +356,14 @@ public class PipeDescriptor {
"pipe_connector_read_file_buffer_memory_control",
String.valueOf(
config.isPipeConnectorReadFileBufferMemoryControlEnabled())))));
- config.setPipeConnectorRetryIntervalMs(
+ config.setPipeSinkRetryIntervalMs(
Long.parseLong(
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
.orElse(
properties.getProperty(
"pipe_connector_retry_interval_ms",
-
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
- config.setPipeConnectorRPCThriftCompressionEnabled(
+
String.valueOf(config.getPipeSinkRetryIntervalMs())))));
+ config.setPipeSinkRPCThriftCompressionEnabled(
Boolean.parseBoolean(
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
.orElse(