This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch connection-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/connection-retry by this push:
     new 4a2f32edd8b fix
4a2f32edd8b is described below

commit 4a2f32edd8b092a8a934968efb96a979f8364ed1
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 16:25:37 2026 +0800

    fix
---
 .../apache/iotdb/commons/conf/CommonConfig.java    | 23 +++++++++++++---------
 .../task/subtask/PipeAbstractSinkSubtask.java      |  3 ++-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  2 +-
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  8 ++++----
 4 files changed, 21 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 73673bb3136..8ed5b73bcd9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -270,7 +270,8 @@ public class CommonConfig {
   private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
   private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
   private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
-  private long pipeConnectorRetryIntervalMs = 1000L;
+  private long pipeSinkRetryIntervalMs = 1000L;
+  private boolean pipeSinkRetryLocallyForConnectionError = true;
   private boolean pipeConnectorRPCThriftCompressionEnabled = false;
 
   private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
@@ -1105,7 +1106,7 @@ public class CommonConfig {
     return isPipeConnectorReadFileBufferMemoryControlEnabled;
   }
 
-  public void setIsPipeConnectorReadFileBufferMemoryControlEnabled(
+  public void setIsPipeSinkReadFileBufferMemoryControlEnabled(
       boolean isPipeConnectorReadFileBufferMemoryControlEnabled) {
     if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
         == isPipeConnectorReadFileBufferMemoryControlEnabled) {
@@ -1118,7 +1119,7 @@ public class CommonConfig {
         isPipeConnectorReadFileBufferMemoryControlEnabled);
   }
 
-  public void setPipeConnectorRPCThriftCompressionEnabled(
+  public void setPipeSinkRPCThriftCompressionEnabled(
       boolean pipeConnectorRPCThriftCompressionEnabled) {
     if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
         == pipeConnectorRPCThriftCompressionEnabled) {
@@ -1355,16 +1356,20 @@ public class CommonConfig {
     logger.info("pipeAutoRestartEnabled is set to {}.", 
pipeAutoRestartEnabled);
   }
 
-  public long getPipeConnectorRetryIntervalMs() {
-    return pipeConnectorRetryIntervalMs;
+  public long getPipeSinkRetryIntervalMs() {
+    return pipeSinkRetryIntervalMs;
   }
 
-  public void setPipeConnectorRetryIntervalMs(long 
pipeConnectorRetryIntervalMs) {
-    if (this.pipeConnectorRetryIntervalMs == pipeConnectorRetryIntervalMs) {
+  public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
+    if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) {
       return;
     }
-    this.pipeConnectorRetryIntervalMs = pipeConnectorRetryIntervalMs;
-    logger.info("pipeConnectorRetryIntervalMs is set to {}", 
pipeConnectorRetryIntervalMs);
+    this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs;
+    logger.info("pipeSinkRetryIntervalMs is set to {}", 
pipeConnectorRetryIntervalMs);
+  }
+
+  public boolean isPipeSinkRetryLocallyForConnectionError() {
+    return pipeSinkRetryLocallyForConnectionError;
   }
 
   public int 
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index af75e6424f7..c57a1fa9b33 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -177,7 +177,8 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
             e);
         try {
           sleepIfNoHighPriorityTask(
-              retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+              Math.min(retry, MAX_RETRY_TIMES)
+                  * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
         } catch (final InterruptedException interruptedException) {
           LOGGER.info(
               "Interrupted while sleeping, will retry to handshake with the 
target system.",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 1ed1e39911f..ca43b58e368 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -184,7 +184,7 @@ public class PipeConfig {
   }
 
   public long getPipeConnectorRetryIntervalMs() {
-    return COMMON_CONFIG.getPipeConnectorRetryIntervalMs();
+    return COMMON_CONFIG.getPipeSinkRetryIntervalMs();
   }
 
   public boolean isPipeConnectorRPCThriftCompressionEnabled() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 2aeab95972d..8a18650b0bb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -348,7 +348,7 @@ public class PipeDescriptor {
                     properties.getProperty(
                         "pipe_connector_read_file_buffer_size",
                         
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
-    config.setIsPipeConnectorReadFileBufferMemoryControlEnabled(
+    config.setIsPipeSinkReadFileBufferMemoryControlEnabled(
         Boolean.parseBoolean(
             
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_memory_control"))
                 .orElse(
@@ -356,14 +356,14 @@ public class PipeDescriptor {
                         "pipe_connector_read_file_buffer_memory_control",
                         String.valueOf(
                             
config.isPipeConnectorReadFileBufferMemoryControlEnabled())))));
-    config.setPipeConnectorRetryIntervalMs(
+    config.setPipeSinkRetryIntervalMs(
         Long.parseLong(
             
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_retry_interval_ms",
-                        
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
-    config.setPipeConnectorRPCThriftCompressionEnabled(
+                        
String.valueOf(config.getPipeSinkRetryIntervalMs())))));
+    config.setPipeSinkRPCThriftCompressionEnabled(
         Boolean.parseBoolean(
             
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
                 .orElse(

Reply via email to