This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch connection-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/connection-retry by this push:
     new 50ecb35ad5c may-fix
50ecb35ad5c is described below

commit 50ecb35ad5ccd9e8b0e83f5ae9c69a4dcd94526a
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 16:52:14 2026 +0800

    may-fix
---
 .../iotdb/tool/tsfile/ImportTsFileRemotely.java    |   9 +-
 .../exchange/sender/TwoStageAggregateSender.java   |   5 +-
 .../protocol/airgap/IoTDBAirGapReceiver.java       |   2 +-
 .../client/IoTDBDataNodeAsyncClientManager.java    |   4 +-
 .../sink/protocol/legacy/IoTDBLegacyPipeSink.java  |   2 +-
 .../pipeconsensus/PipeConsensusSyncSink.java       |   2 +-
 .../PipeConsensusTsFileInsertionEventHandler.java  |   2 +-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |   2 +-
 .../async/handler/PipeTransferTsFileHandler.java   |   6 +-
 .../GeneralRegionAttributeSecurityService.java     |   4 +-
 .../iotdb/commons/client/ClientPoolFactory.java    |  20 ++--
 .../apache/iotdb/commons/conf/CommonConfig.java    | 108 +++++++++++----------
 .../task/subtask/PipeAbstractSinkSubtask.java      |  10 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  93 +++++++++---------
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  28 +++---
 .../pipe/sink/client/IoTDBClientManager.java       |   2 +-
 .../commons/pipe/sink/client/IoTDBSyncClient.java  |   2 +-
 .../pipe/sink/client/IoTDBSyncClientManager.java   |   4 +-
 .../pipe/sink/limiter/GlobalRPCRateLimiter.java    |   2 +-
 .../pipe/sink/protocol/IoTDBAirGapSink.java        |   4 +-
 .../pipe/sink/protocol/IoTDBSslSyncSink.java       |   2 +-
 21 files changed, 159 insertions(+), 154 deletions(-)

diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index efb7f2abdbf..aca33ba4dbf 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -164,7 +164,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
                 "Handshake error with target server ip: %s, port: %s, because: 
%s.",
                 client.getIpAddress(), client.getPort(), resp.getStatus()));
       } else {
-        
client.setTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+        
client.setTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
         IOT_PRINTER.println(
             String.format(
                 "Handshake success. Target server ip: %s, port: %s",
@@ -232,7 +232,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
 
   private void transferFilePieces(final File file, final boolean isMultiFile)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
@@ -299,10 +299,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase 
{
       this.client =
           new IoTDBSyncClient(
               new ThriftClientProperty.Builder()
-                  .setConnectionTimeoutMs(
-                      
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs())
+                  
.setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs())
                   .setRpcThriftCompressionEnabled(
-                      
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
+                      
PipeConfig.getInstance().isPipeSinkRPCThriftCompressionEnabled())
                   .build(),
               getEndPoint().getIp(),
               getEndPoint().getPort(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index bac357368c0..3c36559a300 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -208,9 +208,8 @@ public class TwoStageAggregateSender implements 
AutoCloseable {
   private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws 
TTransportException {
     return new IoTDBSyncClient(
         new ThriftClientProperty.Builder()
-            
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
-            .setRpcThriftCompressionEnabled(
-                PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+            
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
+            
.setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
             .build(),
         endPoint.getIp(),
         endPoint.getPort(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 3cea6c998f8..610b9e5fe1a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -70,7 +70,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
 
   @Override
   public void runMayThrow() throws Throwable {
-    
socket.setSoTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+    
socket.setSoTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
     socket.setKeepAlive(true);
 
     LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, 
socket);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 5ee7f22bc07..73e2213eea6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -312,7 +312,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF,
           Boolean.toString(skipIfNoPrivileges));
 
-      
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+      
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
       waitHandshakeFinished(isHandshakeFinished);
 
@@ -331,7 +331,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
         resp.set(null);
         exception.set(null);
 
-        
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+        
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
         client.pipeTransfer(
             PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
                 
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index 332ca6bab7c..e296a7e0faa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -389,7 +389,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
     long position = 0;
 
     // Try small piece to rebase the file position.
-    final byte[] buffer = new 
byte[PipeConfig.getInstance().getPipeConnectorReadFileBufferSize()];
+    final byte[] buffer = new 
byte[PipeConfig.getInstance().getPipeSinkReadFileBufferSize()];
     try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"r")) {
       while (true) {
         final int dataLength = randomAccessFile.read(buffer);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
index bb0dcf2dc09..50972ee4454 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
@@ -446,7 +446,7 @@ public class PipeConsensusSyncSink extends IoTDBSink {
       final TCommitId tCommitId,
       final TConsensusGroupId tConsensusGroupId)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index 485096173c1..630262d4485 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -104,7 +104,7 @@ public class PipeConsensusTsFileInsertionEventHandler
     transferMod = event.isWithMod();
     currentFile = transferMod ? modFile : tsFile;
 
-    readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     readBuffer = new byte[readFileBufferSize];
     position = 0;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index e6e368a5280..95e36ee923f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -599,7 +599,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
       // Stop retrying if the execution time exceeds the threshold for better 
realtime performance
       if (System.currentTimeMillis() - retryStartTime
-          > 
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()) 
{
+          > 
PipeConfig.getInstance().getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall()) {
         if (retryEventQueueEventCounter.getTabletInsertionEventCount()
                 < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
             && retryEventQueueEventCounter.getTsFileInsertionEventCount()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 3ef46034554..67fa5c9bd8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -126,7 +126,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     readFileBufferSize =
         (int)
             Math.min(
-                PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(),
+                PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
                 transferMod ? Math.max(tsFile.length(), modFile.length()) : 
tsFile.length());
     position = 0;
 
@@ -146,7 +146,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       memoryBlock =
           PipeDataNodeResourceManager.memory()
               .forceAllocateForTsFileWithRetry(
-                  
PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()
+                  
PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()
                       ? readFileBufferSize
                       : 0);
       readBuffer = new byte[readFileBufferSize];
@@ -477,7 +477,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
    * @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock.
    */
   private void waitForResourceEnough4Slicing(final long timeoutMs) throws 
InterruptedException {
-    if 
(!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()) 
{
+    if 
(!PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()) {
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
index 1bff56500ea..408cc121590 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
@@ -102,9 +102,7 @@ public class GeneralRegionAttributeSecurityService extends 
AbstractPeriodicalSer
     // UpdateClearContainer and version / TEndPoint are not calculated
     final AtomicInteger limit =
         new AtomicInteger(
-            CommonDescriptor.getInstance()
-                .getConfig()
-                .getPipeConnectorRequestSliceThresholdBytes());
+            
CommonDescriptor.getInstance().getConfig().getPipeSinkRequestSliceThresholdBytes());
 
     final AtomicBoolean hasRemaining = new AtomicBoolean(false);
     final Map<SchemaRegionId, Pair<Long, Map<TDataNodeLocation, byte[]>>> 
attributeUpdateCommitMap =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 7ed34359b21..9e41b585ec4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -281,15 +281,13 @@ public class ClientPoolFactory {
               new AsyncPipeDataTransferServiceClient.Factory(
                   manager,
                   new ThriftClientProperty.Builder()
-                      
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
-                      .setRpcThriftCompressionEnabled(
-                          conf.isPipeConnectorRPCThriftCompressionEnabled())
-                      .setSelectorNumOfAsyncClientManager(
-                          conf.getPipeAsyncConnectorSelectorNumber())
+                      
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+                      
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+                      
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
                       .build(),
                   ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-                  
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber())
+                  
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
@@ -308,16 +306,14 @@ public class ClientPoolFactory {
               new AsyncPipeDataTransferServiceClient.Factory(
                   manager,
                   new ThriftClientProperty.Builder()
-                      
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
-                      .setRpcThriftCompressionEnabled(
-                          conf.isPipeConnectorRPCThriftCompressionEnabled())
-                      .setSelectorNumOfAsyncClientManager(
-                          conf.getPipeAsyncConnectorSelectorNumber())
+                      
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+                      
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+                      
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
                       
.setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException())
                       .build(),
                   ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-                  
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxTsFileClientNumber())
+                  
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 8ed5b73bcd9..fdc1bf7007d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -266,13 +266,13 @@ public class CommonConfig {
   private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
   private long pipeSourceMatcherCacheSize = 1024;
 
-  private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
-  private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
-  private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
-  private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
+  private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+  private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
+  private int pipeSinkReadFileBufferSize = 5242880; // 5MB
+  private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
   private long pipeSinkRetryIntervalMs = 1000L;
   private boolean pipeSinkRetryLocallyForConnectionError = true;
-  private boolean pipeConnectorRPCThriftCompressionEnabled = false;
+  private boolean pipeSinkRPCThriftCompressionEnabled = false;
 
   private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
   private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
@@ -1051,88 +1051,85 @@ public class CommonConfig {
     logger.info("pipeSourceMatcherCacheSize is set to {}.", 
pipeSourceMatcherCacheSize);
   }
 
-  public int getPipeConnectorHandshakeTimeoutMs() {
-    return pipeConnectorHandshakeTimeoutMs;
+  public int getPipeSinkHandshakeTimeoutMs() {
+    return pipeSinkHandshakeTimeoutMs;
   }
 
-  public void setPipeConnectorHandshakeTimeoutMs(long 
pipeConnectorHandshakeTimeoutMs) {
-    final int fPipeConnectorHandshakeTimeoutMs = 
this.pipeConnectorHandshakeTimeoutMs;
+  public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
+    final int fPipeConnectorHandshakeTimeoutMs = 
this.pipeSinkHandshakeTimeoutMs;
     try {
-      this.pipeConnectorHandshakeTimeoutMs = 
Math.toIntExact(pipeConnectorHandshakeTimeoutMs);
+      this.pipeSinkHandshakeTimeoutMs = 
Math.toIntExact(pipeSinkHandshakeTimeoutMs);
     } catch (ArithmeticException e) {
-      this.pipeConnectorHandshakeTimeoutMs = Integer.MAX_VALUE;
+      this.pipeSinkHandshakeTimeoutMs = Integer.MAX_VALUE;
       logger.warn(
           "Given pipe connector handshake timeout is too large, set to {} 
ms.", Integer.MAX_VALUE);
     } finally {
-      if (fPipeConnectorHandshakeTimeoutMs != 
this.pipeConnectorHandshakeTimeoutMs) {
+      if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) 
{
         logger.info(
-            "pipeConnectorHandshakeTimeoutMs is set to {}.", 
this.pipeConnectorHandshakeTimeoutMs);
+            "pipeConnectorHandshakeTimeoutMs is set to {}.", 
this.pipeSinkHandshakeTimeoutMs);
       }
     }
   }
 
-  public int getPipeConnectorTransferTimeoutMs() {
-    return pipeConnectorTransferTimeoutMs;
+  public int getPipeSinkTransferTimeoutMs() {
+    return pipeSinkTransferTimeoutMs;
   }
 
-  public void setPipeConnectorTransferTimeoutMs(long 
pipeConnectorTransferTimeoutMs) {
-    final int fPipeConnectorTransferTimeoutMs = 
this.pipeConnectorTransferTimeoutMs;
+  public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) {
+    final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
     try {
-      this.pipeConnectorTransferTimeoutMs = 
Math.toIntExact(pipeConnectorTransferTimeoutMs);
+      this.pipeSinkTransferTimeoutMs = 
Math.toIntExact(pipeSinkTransferTimeoutMs);
     } catch (ArithmeticException e) {
-      this.pipeConnectorTransferTimeoutMs = Integer.MAX_VALUE;
+      this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE;
       logger.warn(
           "Given pipe connector transfer timeout is too large, set to {} ms.", 
Integer.MAX_VALUE);
     } finally {
-      if (fPipeConnectorTransferTimeoutMs != 
this.pipeConnectorTransferTimeoutMs) {
-        logger.info("pipeConnectorTransferTimeoutMs is set to {}.", 
pipeConnectorTransferTimeoutMs);
+      if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
+        logger.info("pipeConnectorTransferTimeoutMs is set to {}.", 
pipeSinkTransferTimeoutMs);
       }
     }
   }
 
-  public int getPipeConnectorReadFileBufferSize() {
-    return pipeConnectorReadFileBufferSize;
+  public int getPipeSinkReadFileBufferSize() {
+    return pipeSinkReadFileBufferSize;
   }
 
-  public void setPipeConnectorReadFileBufferSize(int 
pipeConnectorReadFileBufferSize) {
-    if (this.pipeConnectorReadFileBufferSize == 
pipeConnectorReadFileBufferSize) {
+  public void setPipeSinkReadFileBufferSize(int pipeSinkReadFileBufferSize) {
+    if (this.pipeSinkReadFileBufferSize == pipeSinkReadFileBufferSize) {
       return;
     }
-    this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
-    logger.info("pipeConnectorReadFileBufferSize is set to {}.", 
pipeConnectorReadFileBufferSize);
+    this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize;
+    logger.info("pipeConnectorReadFileBufferSize is set to {}.", 
pipeSinkReadFileBufferSize);
   }
 
-  public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
-    return isPipeConnectorReadFileBufferMemoryControlEnabled;
+  public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+    return isPipeSinkReadFileBufferMemoryControlEnabled;
   }
 
   public void setIsPipeSinkReadFileBufferMemoryControlEnabled(
-      boolean isPipeConnectorReadFileBufferMemoryControlEnabled) {
-    if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
-        == isPipeConnectorReadFileBufferMemoryControlEnabled) {
+      boolean isPipeSinkReadFileBufferMemoryControlEnabled) {
+    if (this.isPipeSinkReadFileBufferMemoryControlEnabled
+        == isPipeSinkReadFileBufferMemoryControlEnabled) {
       return;
     }
-    this.isPipeConnectorReadFileBufferMemoryControlEnabled =
-        isPipeConnectorReadFileBufferMemoryControlEnabled;
+    this.isPipeSinkReadFileBufferMemoryControlEnabled =
+        isPipeSinkReadFileBufferMemoryControlEnabled;
     logger.info(
-        "isPipeConnectorReadFileBufferMemoryControlEnabled is set to {}.",
-        isPipeConnectorReadFileBufferMemoryControlEnabled);
+        "isPipeSinkReadFileBufferMemoryControlEnabled is set to {}.",
+        isPipeSinkReadFileBufferMemoryControlEnabled);
   }
 
-  public void setPipeSinkRPCThriftCompressionEnabled(
-      boolean pipeConnectorRPCThriftCompressionEnabled) {
-    if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
-        == pipeConnectorRPCThriftCompressionEnabled) {
+  public void setPipeSinkRPCThriftCompressionEnabled(boolean 
pipeSinkRPCThriftCompressionEnabled) {
+    if (this.isPipeSinkReadFileBufferMemoryControlEnabled == 
pipeSinkRPCThriftCompressionEnabled) {
       return;
     }
-    this.pipeConnectorRPCThriftCompressionEnabled = 
pipeConnectorRPCThriftCompressionEnabled;
+    this.pipeSinkRPCThriftCompressionEnabled = 
pipeSinkRPCThriftCompressionEnabled;
     logger.info(
-        "pipeConnectorRPCThriftCompressionEnabled is set to {}.",
-        pipeConnectorRPCThriftCompressionEnabled);
+        "pipeSinkRPCThriftCompressionEnabled is set to {}.", 
pipeSinkRPCThriftCompressionEnabled);
   }
 
-  public boolean isPipeConnectorRPCThriftCompressionEnabled() {
-    return pipeConnectorRPCThriftCompressionEnabled;
+  public boolean isPipeSinkRPCThriftCompressionEnabled() {
+    return pipeSinkRPCThriftCompressionEnabled;
   }
 
   public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
@@ -1198,11 +1195,11 @@ public class CommonConfig {
         pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall);
   }
 
-  public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+  public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
     return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
   }
 
-  public int getPipeAsyncConnectorSelectorNumber() {
+  public int getPipeAsyncSinkSelectorNumber() {
     return pipeAsyncConnectorSelectorNumber;
   }
 
@@ -1220,7 +1217,7 @@ public class CommonConfig {
     logger.info("pipeAsyncConnectorSelectorNumber is set to {}.", 
pipeAsyncConnectorSelectorNumber);
   }
 
-  public int getPipeAsyncConnectorMaxClientNumber() {
+  public int getPipeAsyncSinkMaxClientNumber() {
     return pipeAsyncConnectorMaxClientNumber;
   }
 
@@ -1239,7 +1236,7 @@ public class CommonConfig {
         "pipeAsyncConnectorMaxClientNumber is set to {}.", 
pipeAsyncConnectorMaxClientNumber);
   }
 
-  public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+  public int getPipeAsyncSinkMaxTsFileClientNumber() {
     return pipeAsyncConnectorMaxTsFileClientNumber;
   }
 
@@ -1372,6 +1369,17 @@ public class CommonConfig {
     return pipeSinkRetryLocallyForConnectionError;
   }
 
+  public void setPipeSinkRetryLocallyForConnectionError(
+      boolean pipeSinkRetryLocallyForConnectionError) {
+    if (this.pipeSinkRetryLocallyForConnectionError == 
pipeSinkRetryLocallyForConnectionError) {
+      return;
+    }
+    this.pipeSinkRetryLocallyForConnectionError = 
pipeSinkRetryLocallyForConnectionError;
+    logger.info(
+        "pipeSinkRetryLocallyForConnectionError is set to {}",
+        pipeSinkRetryLocallyForConnectionError);
+  }
+
   public int 
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
     return pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount;
   }
@@ -2076,7 +2084,7 @@ public class CommonConfig {
         "rateLimiterHotReloadCheckIntervalMs is set to {}", 
rateLimiterHotReloadCheckIntervalMs);
   }
 
-  public int getPipeConnectorRequestSliceThresholdBytes() {
+  public int getPipeSinkRequestSliceThresholdBytes() {
     return pipeConnectorRequestSliceThresholdBytes;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index c57a1fa9b33..760d3c2ffeb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -159,7 +159,8 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
         throwable);
 
     int retry = 0;
-    while (retry < MAX_RETRY_TIMES) {
+    while (retry < MAX_RETRY_TIMES
+        || 
PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
       try {
         outputPipeSink.handshake();
         LOGGER.info(
@@ -170,15 +171,16 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
         retry++;
         LOGGER.warn(
             "{} failed to handshake with the target system for {} times, "
-                + "will retry at most {} times.",
+                + 
(PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()
+                    ? "will retry forever."
+                    : String.format("will retry at most %s times.", 
MAX_RETRY_TIMES)),
             outputPipeSink.getClass().getName(),
             retry,
-            MAX_RETRY_TIMES,
             e);
         try {
           sleepIfNoHighPriorityTask(
               Math.min(retry, MAX_RETRY_TIMES)
-                  * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+                  * PipeConfig.getInstance().getPipeSinkRetryIntervalMs());
         } catch (final InterruptedException interruptedException) {
           LOGGER.info(
               "Interrupted while sleeping, will retry to handshake with the 
target system.",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index ca43b58e368..69918b59fac 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -103,7 +103,7 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMinimumReceiverMemory();
   }
 
-  /////////////////////////////// Subtask Connector 
///////////////////////////////
+  /////////////////////////////// Subtask Sink ///////////////////////////////
 
   public int getPipeRealTimeQueuePollTsFileThreshold() {
     return COMMON_CONFIG.getPipeRealTimeQueuePollTsFileThreshold();
@@ -165,30 +165,34 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeSourceMatcherCacheSize();
   }
 
-  /////////////////////////////// Connector ///////////////////////////////
+  /////////////////////////////// Sink ///////////////////////////////
 
-  public int getPipeConnectorHandshakeTimeoutMs() {
-    return COMMON_CONFIG.getPipeConnectorHandshakeTimeoutMs();
+  public int getPipeSinkHandshakeTimeoutMs() {
+    return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
   }
 
-  public int getPipeConnectorTransferTimeoutMs() {
-    return COMMON_CONFIG.getPipeConnectorTransferTimeoutMs();
+  public int getPipeSinkTransferTimeoutMs() {
+    return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
   }
 
-  public int getPipeConnectorReadFileBufferSize() {
-    return COMMON_CONFIG.getPipeConnectorReadFileBufferSize();
+  public int getPipeSinkReadFileBufferSize() {
+    return COMMON_CONFIG.getPipeSinkReadFileBufferSize();
   }
 
-  public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
-    return COMMON_CONFIG.isPipeConnectorReadFileBufferMemoryControlEnabled();
+  public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+    return COMMON_CONFIG.isPipeSinkReadFileBufferMemoryControlEnabled();
   }
 
-  public long getPipeConnectorRetryIntervalMs() {
+  public long getPipeSinkRetryIntervalMs() {
     return COMMON_CONFIG.getPipeSinkRetryIntervalMs();
   }
 
-  public boolean isPipeConnectorRPCThriftCompressionEnabled() {
-    return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
+  public boolean isPipeSinkRetryLocallyForConnectionError() {
+    return COMMON_CONFIG.isPipeSinkRetryLocallyForConnectionError();
+  }
+
+  public boolean isPipeSinkRPCThriftCompressionEnabled() {
+    return COMMON_CONFIG.isPipeSinkRPCThriftCompressionEnabled();
   }
 
   public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
@@ -203,27 +207,27 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
   }
 
-  public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
-    return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
+  public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
+    return COMMON_CONFIG.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall();
   }
 
-  public int getPipeAsyncConnectorSelectorNumber() {
-    return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
+  public int getPipeAsyncSinkSelectorNumber() {
+    return COMMON_CONFIG.getPipeAsyncSinkSelectorNumber();
   }
 
-  public int getPipeAsyncConnectorMaxClientNumber() {
-    return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
+  public int getPipeAsyncSinkMaxClientNumber() {
+    return COMMON_CONFIG.getPipeAsyncSinkMaxClientNumber();
   }
 
-  public int getPipeAsyncConnectorMaxTsFileClientNumber() {
-    return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber();
+  public int getPipeAsyncSinkMaxTsFileClientNumber() {
+    return COMMON_CONFIG.getPipeAsyncSinkMaxTsFileClientNumber();
   }
 
   public double getPipeSendTsFileRateLimitBytesPerSecond() {
     return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond();
   }
 
-  public double getPipeAllConnectorsRateLimitBytesPerSecond() {
+  public double getPipeAllSinksRateLimitBytesPerSecond() {
     return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
   }
 
@@ -231,8 +235,8 @@ public class PipeConfig {
     return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs();
   }
 
-  public int getPipeConnectorRequestSliceThresholdBytes() {
-    return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
+  public int getPipeSinkRequestSliceThresholdBytes() {
+    return COMMON_CONFIG.getPipeSinkRequestSliceThresholdBytes();
   }
 
   public long getPipeMaxReaderChunkSize() {
@@ -503,16 +507,14 @@ public class PipeConfig {
         getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
     LOGGER.info("PipeSourceMatcherCacheSize: {}", 
getPipeSourceMatcherCacheSize());
 
-    LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", 
getPipeConnectorHandshakeTimeoutMs());
-    LOGGER.info("PipeConnectorTransferTimeoutMs: {}", 
getPipeConnectorTransferTimeoutMs());
-    LOGGER.info("PipeConnectorReadFileBufferSize: {}", 
getPipeConnectorReadFileBufferSize());
-    LOGGER.info(
-        "PipeConnectorReadFileBufferMemoryControlEnabled: {}",
-        isPipeConnectorReadFileBufferMemoryControlEnabled());
-    LOGGER.info("PipeConnectorRetryIntervalMs: {}", 
getPipeConnectorRetryIntervalMs());
+    LOGGER.info("PipeSinkHandshakeTimeoutMs: {}", 
getPipeSinkHandshakeTimeoutMs());
+    LOGGER.info("PipeSinkTransferTimeoutMs: {}", 
getPipeSinkTransferTimeoutMs());
+    LOGGER.info("PipeSinkReadFileBufferSize: {}", 
getPipeSinkReadFileBufferSize());
     LOGGER.info(
-        "PipeConnectorRPCThriftCompressionEnabled: {}",
-        isPipeConnectorRPCThriftCompressionEnabled());
+        "PipeSinkReadFileBufferMemoryControlEnabled: {}",
+        isPipeSinkReadFileBufferMemoryControlEnabled());
+    LOGGER.info("PipeSinkRetryIntervalMs: {}", getPipeSinkRetryIntervalMs());
+    LOGGER.info("PipeSinkRPCThriftCompressionEnabled: {}", 
isPipeSinkRPCThriftCompressionEnabled());
     LOGGER.info(
         "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
     LOGGER.info("PipeMaxAlignedSeriesChunkSizeInOneBatch: {}", 
getPipeMaxReaderChunkSize());
@@ -550,34 +552,29 @@ public class PipeConfig {
         getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold());
 
     LOGGER.info(
-        "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
+        "PipeAsyncSinkForcedRetryTsFileEventQueueSizeThreshold: {}",
         getPipeAsyncSinkForcedRetryTsFileEventQueueSize());
     LOGGER.info(
-        "PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
+        "PipeAsyncSinkForcedRetryTabletEventQueueSizeThreshold: {}",
         getPipeAsyncSinkForcedRetryTabletEventQueueSize());
     LOGGER.info(
-        "PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
+        "PipeAsyncSinkForcedRetryTotalEventQueueSizeThreshold: {}",
         getPipeAsyncSinkForcedRetryTotalEventQueueSize());
     LOGGER.info(
-        "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
-        getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
-    LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
-    LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
-    LOGGER.info(
-        "PipeAsyncConnectorMaxTsFileClientNumber: {}",
-        getPipeAsyncConnectorMaxTsFileClientNumber());
+        "PipeAsyncSinkMaxRetryExecutionTimeMsPerCall: {}",
+        getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall());
+    LOGGER.info("PipeAsyncSinkSelectorNumber: {}", 
getPipeAsyncSinkSelectorNumber());
+    LOGGER.info("PipeAsyncSinkMaxClientNumber: {}", 
getPipeAsyncSinkMaxClientNumber());
+    LOGGER.info("PipeAsyncSinkMaxTsFileClientNumber: {}", 
getPipeAsyncSinkMaxTsFileClientNumber());
 
     LOGGER.info(
         "PipeSendTsFileRateLimitBytesPerSecond: {}", 
getPipeSendTsFileRateLimitBytesPerSecond());
     LOGGER.info(
-        "PipeAllConnectorsRateLimitBytesPerSecond: {}",
-        getPipeAllConnectorsRateLimitBytesPerSecond());
+        "PipeAllSinksRateLimitBytesPerSecond: {}", 
getPipeAllSinksRateLimitBytesPerSecond());
     LOGGER.info(
         "RateLimiterHotReloadCheckIntervalMs: {}", 
getRateLimiterHotReloadCheckIntervalMs());
 
-    LOGGER.info(
-        "PipeConnectorRequestSliceThresholdBytes: {}",
-        getPipeConnectorRequestSliceThresholdBytes());
+    LOGGER.info("PipeSinkRequestSliceThresholdBytes: {}", 
getPipeSinkRequestSliceThresholdBytes());
 
     LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", 
isSeperatedPipeHeartbeatEnabled());
     LOGGER.info(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 8a18650b0bb..f4ed2cbd9a7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -334,28 +334,27 @@ public class PipeDescriptor {
                         "pipe_extractor_matcher_cache_size",
                         
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
 
-    config.setPipeConnectorHandshakeTimeoutMs(
+    config.setPipeSinkHandshakeTimeoutMs(
         Long.parseLong(
             
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_handshake_timeout_ms",
-                        
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs())))));
-    config.setPipeConnectorReadFileBufferSize(
+                        
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
+    config.setPipeSinkReadFileBufferSize(
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_read_file_buffer_size",
-                        
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
+                        
String.valueOf(config.getPipeSinkReadFileBufferSize())))));
     config.setIsPipeSinkReadFileBufferMemoryControlEnabled(
         Boolean.parseBoolean(
             
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_memory_control"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_read_file_buffer_memory_control",
-                        String.valueOf(
-                            
config.isPipeConnectorReadFileBufferMemoryControlEnabled())))));
+                        
String.valueOf(config.isPipeSinkReadFileBufferMemoryControlEnabled())))));
     config.setPipeSinkRetryIntervalMs(
         Long.parseLong(
             
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
@@ -363,13 +362,21 @@ public class PipeDescriptor {
                     properties.getProperty(
                         "pipe_connector_retry_interval_ms",
                         
String.valueOf(config.getPipeSinkRetryIntervalMs())))));
+    config.setPipeSinkRetryLocallyForConnectionError(
+        Boolean.parseBoolean(
+            Optional.ofNullable(
+                    
properties.getProperty("pipe_sink_retry_locally_for_connection_error"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_connector_retry_locally_for_connection_error",
+                        
String.valueOf(config.isPipeSinkRetryLocallyForConnectionError())))));
     config.setPipeSinkRPCThriftCompressionEnabled(
         Boolean.parseBoolean(
             
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_rpc_thrift_compression_enabled",
-                        
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
+                        
String.valueOf(config.isPipeSinkRPCThriftCompressionEnabled())))));
     config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
         Long.parseLong(
             Optional.ofNullable(
@@ -377,8 +384,7 @@ public class PipeDescriptor {
                 .orElse(
                     properties.getProperty(
                         
"pipe_async_connector_max_retry_execution_time_ms_per_call",
-                        String.valueOf(
-                            
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
+                        
String.valueOf(config.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall())))));
     config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
         Integer.parseInt(
             Optional.ofNullable(
@@ -415,7 +421,7 @@ public class PipeDescriptor {
         Integer.parseInt(
             properties.getProperty(
                 "pipe_connector_request_slice_threshold_bytes",
-                
String.valueOf(config.getPipeConnectorRequestSliceThresholdBytes()))));
+                
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes()))));
 
     config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
         Long.parseLong(
@@ -565,7 +571,7 @@ public class PipeDescriptor {
         parserPipeConfig(
             properties, "pipe_sink_timeout_ms", "pipe_connector_timeout_ms", 
isHotModify);
     if (value != null) {
-      config.setPipeConnectorTransferTimeoutMs(Long.parseLong(value));
+      config.setPipeSinkTransferTimeoutMs(Long.parseLong(value));
     }
 
     value =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
index 3e009b279d7..1f76f5d2453 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
@@ -58,7 +58,7 @@ public abstract class IoTDBClientManager {
   private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 
1 day
   private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000; 
// 6 hours
   protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
-      new 
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+      new 
AtomicInteger(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
 
   protected IoTDBClientManager(
       final List<TEndPoint> endPointList,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
index a0bb65b5a7e..b7f42295e6c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
@@ -100,7 +100,7 @@ public class IoTDBSyncClient extends 
IClientRPCService.Client
 
   @Override
   public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws 
TException {
-    final int bodySizeLimit = 
PipeConfig.getInstance().getPipeConnectorRequestSliceThresholdBytes();
+    final int bodySizeLimit = 
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
     if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
         || req.body.limit() < bodySizeLimit) {
       return super.pipeTransfer(req);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index 4c5d51f83c5..76c145d0dab 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -198,9 +198,9 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       clientAndStatus.setLeft(
           new IoTDBSyncClient(
               new ThriftClientProperty.Builder()
-                  
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+                  
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
                   .setRpcThriftCompressionEnabled(
-                      PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+                      PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
                   .build(),
               endPoint.getIp(),
               endPoint.getPort(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
index 9a6aba5b90b..e497656eab2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
@@ -28,6 +28,6 @@ public class GlobalRPCRateLimiter extends GlobalRateLimiter {
 
   @Override
   protected double getThroughputBytesPerSecond() {
-    return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
+    return CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index aba6c7e1d82..7d84e3bb98f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -242,7 +242,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
     } else {
       supportModsIfIsDataNodeReceiver = true;
     }
-    socket.setSoTimeout(PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
+    socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
     LOGGER.info("Handshake success. Socket: {}", socket);
   }
 
@@ -269,7 +269,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
       final AirGapSocket socket,
       final boolean isMultiFile)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
index b6b8e52f1fa..75a4607a23b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
@@ -184,7 +184,7 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink {
       final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
       final boolean isMultiFile)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {

Reply via email to