This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch connection-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/connection-retry by this push:
new 50ecb35ad5c may-fix
50ecb35ad5c is described below
commit 50ecb35ad5ccd9e8b0e83f5ae9c69a4dcd94526a
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 16:52:14 2026 +0800
may-fix
---
.../iotdb/tool/tsfile/ImportTsFileRemotely.java | 9 +-
.../exchange/sender/TwoStageAggregateSender.java | 5 +-
.../protocol/airgap/IoTDBAirGapReceiver.java | 2 +-
.../client/IoTDBDataNodeAsyncClientManager.java | 4 +-
.../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 2 +-
.../pipeconsensus/PipeConsensusSyncSink.java | 2 +-
.../PipeConsensusTsFileInsertionEventHandler.java | 2 +-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 2 +-
.../async/handler/PipeTransferTsFileHandler.java | 6 +-
.../GeneralRegionAttributeSecurityService.java | 4 +-
.../iotdb/commons/client/ClientPoolFactory.java | 20 ++--
.../apache/iotdb/commons/conf/CommonConfig.java | 108 +++++++++++----------
.../task/subtask/PipeAbstractSinkSubtask.java | 10 +-
.../iotdb/commons/pipe/config/PipeConfig.java | 93 +++++++++---------
.../iotdb/commons/pipe/config/PipeDescriptor.java | 28 +++---
.../pipe/sink/client/IoTDBClientManager.java | 2 +-
.../commons/pipe/sink/client/IoTDBSyncClient.java | 2 +-
.../pipe/sink/client/IoTDBSyncClientManager.java | 4 +-
.../pipe/sink/limiter/GlobalRPCRateLimiter.java | 2 +-
.../pipe/sink/protocol/IoTDBAirGapSink.java | 4 +-
.../pipe/sink/protocol/IoTDBSslSyncSink.java | 2 +-
21 files changed, 159 insertions(+), 154 deletions(-)
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index efb7f2abdbf..aca33ba4dbf 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -164,7 +164,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
"Handshake error with target server ip: %s, port: %s, because:
%s.",
client.getIpAddress(), client.getPort(), resp.getStatus()));
} else {
-
client.setTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+
client.setTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
IOT_PRINTER.println(
String.format(
"Handshake success. Target server ip: %s, port: %s",
@@ -232,7 +232,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
private void transferFilePieces(final File file, final boolean isMultiFile)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
@@ -299,10 +299,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase
{
this.client =
new IoTDBSyncClient(
new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs(
-
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs())
+
.setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
-
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
+
PipeConfig.getInstance().isPipeSinkRPCThriftCompressionEnabled())
.build(),
getEndPoint().getIp(),
getEndPoint().getPort(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index bac357368c0..3c36559a300 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -208,9 +208,8 @@ public class TwoStageAggregateSender implements
AutoCloseable {
private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws
TTransportException {
return new IoTDBSyncClient(
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
- .setRpcThriftCompressionEnabled(
- PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
+
.setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
.build(),
endPoint.getIp(),
endPoint.getPort(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 3cea6c998f8..610b9e5fe1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -70,7 +70,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
@Override
public void runMayThrow() throws Throwable {
-
socket.setSoTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+
socket.setSoTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
socket.setKeepAlive(true);
LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId,
socket);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 5ee7f22bc07..73e2213eea6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -312,7 +312,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF,
Boolean.toString(skipIfNoPrivileges));
-
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
waitHandshakeFinished(isHandshakeFinished);
@@ -331,7 +331,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
resp.set(null);
exception.set(null);
-
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
client.pipeTransfer(
PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index 332ca6bab7c..e296a7e0faa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -389,7 +389,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
long position = 0;
// Try small piece to rebase the file position.
- final byte[] buffer = new
byte[PipeConfig.getInstance().getPipeConnectorReadFileBufferSize()];
+ final byte[] buffer = new
byte[PipeConfig.getInstance().getPipeSinkReadFileBufferSize()];
try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file,
"r")) {
while (true) {
final int dataLength = randomAccessFile.read(buffer);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
index bb0dcf2dc09..50972ee4454 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
@@ -446,7 +446,7 @@ public class PipeConsensusSyncSink extends IoTDBSink {
final TCommitId tCommitId,
final TConsensusGroupId tConsensusGroupId)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index 485096173c1..630262d4485 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -104,7 +104,7 @@ public class PipeConsensusTsFileInsertionEventHandler
transferMod = event.isWithMod();
currentFile = transferMod ? modFile : tsFile;
- readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
readBuffer = new byte[readFileBufferSize];
position = 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index e6e368a5280..95e36ee923f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -599,7 +599,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
// Stop retrying if the execution time exceeds the threshold for better
realtime performance
if (System.currentTimeMillis() - retryStartTime
- >
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())
{
+ >
PipeConfig.getInstance().getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall()) {
if (retryEventQueueEventCounter.getTabletInsertionEventCount()
<
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
&& retryEventQueueEventCounter.getTsFileInsertionEventCount()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 3ef46034554..67fa5c9bd8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -126,7 +126,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
readFileBufferSize =
(int)
Math.min(
- PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(),
+ PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
transferMod ? Math.max(tsFile.length(), modFile.length()) :
tsFile.length());
position = 0;
@@ -146,7 +146,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
memoryBlock =
PipeDataNodeResourceManager.memory()
.forceAllocateForTsFileWithRetry(
-
PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()
+
PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()
? readFileBufferSize
: 0);
readBuffer = new byte[readFileBufferSize];
@@ -477,7 +477,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
* @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock.
*/
private void waitForResourceEnough4Slicing(final long timeoutMs) throws
InterruptedException {
- if
(!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled())
{
+ if
(!PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
index 1bff56500ea..408cc121590 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
@@ -102,9 +102,7 @@ public class GeneralRegionAttributeSecurityService extends
AbstractPeriodicalSer
// UpdateClearContainer and version / TEndPoint are not calculated
final AtomicInteger limit =
new AtomicInteger(
- CommonDescriptor.getInstance()
- .getConfig()
- .getPipeConnectorRequestSliceThresholdBytes());
+
CommonDescriptor.getInstance().getConfig().getPipeSinkRequestSliceThresholdBytes());
final AtomicBoolean hasRemaining = new AtomicBoolean(false);
final Map<SchemaRegionId, Pair<Long, Map<TDataNodeLocation, byte[]>>>
attributeUpdateCommitMap =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 7ed34359b21..9e41b585ec4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -281,15 +281,13 @@ public class ClientPoolFactory {
new AsyncPipeDataTransferServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
- .setRpcThriftCompressionEnabled(
- conf.isPipeConnectorRPCThriftCompressionEnabled())
- .setSelectorNumOfAsyncClientManager(
- conf.getPipeAsyncConnectorSelectorNumber())
+
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
.build(),
ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber())
+
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
@@ -308,16 +306,14 @@ public class ClientPoolFactory {
new AsyncPipeDataTransferServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
- .setRpcThriftCompressionEnabled(
- conf.isPipeConnectorRPCThriftCompressionEnabled())
- .setSelectorNumOfAsyncClientManager(
- conf.getPipeAsyncConnectorSelectorNumber())
+
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
.setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException())
.build(),
ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxTsFileClientNumber())
+
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 8ed5b73bcd9..fdc1bf7007d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -266,13 +266,13 @@ public class CommonConfig {
private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
private long pipeSourceMatcherCacheSize = 1024;
- private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
- private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
- private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
- private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
+ private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+ private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
+ private int pipeSinkReadFileBufferSize = 5242880; // 5MB
+ private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
private long pipeSinkRetryIntervalMs = 1000L;
private boolean pipeSinkRetryLocallyForConnectionError = true;
- private boolean pipeConnectorRPCThriftCompressionEnabled = false;
+ private boolean pipeSinkRPCThriftCompressionEnabled = false;
private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
@@ -1051,88 +1051,85 @@ public class CommonConfig {
logger.info("pipeSourceMatcherCacheSize is set to {}.",
pipeSourceMatcherCacheSize);
}
- public int getPipeConnectorHandshakeTimeoutMs() {
- return pipeConnectorHandshakeTimeoutMs;
+ public int getPipeSinkHandshakeTimeoutMs() {
+ return pipeSinkHandshakeTimeoutMs;
}
- public void setPipeConnectorHandshakeTimeoutMs(long
pipeConnectorHandshakeTimeoutMs) {
- final int fPipeConnectorHandshakeTimeoutMs =
this.pipeConnectorHandshakeTimeoutMs;
+ public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
+ final int fPipeConnectorHandshakeTimeoutMs =
this.pipeSinkHandshakeTimeoutMs;
try {
- this.pipeConnectorHandshakeTimeoutMs =
Math.toIntExact(pipeConnectorHandshakeTimeoutMs);
+ this.pipeSinkHandshakeTimeoutMs =
Math.toIntExact(pipeSinkHandshakeTimeoutMs);
} catch (ArithmeticException e) {
- this.pipeConnectorHandshakeTimeoutMs = Integer.MAX_VALUE;
+ this.pipeSinkHandshakeTimeoutMs = Integer.MAX_VALUE;
logger.warn(
"Given pipe connector handshake timeout is too large, set to {}
ms.", Integer.MAX_VALUE);
} finally {
- if (fPipeConnectorHandshakeTimeoutMs !=
this.pipeConnectorHandshakeTimeoutMs) {
+ if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs)
{
logger.info(
- "pipeConnectorHandshakeTimeoutMs is set to {}.",
this.pipeConnectorHandshakeTimeoutMs);
+ "pipeConnectorHandshakeTimeoutMs is set to {}.",
this.pipeSinkHandshakeTimeoutMs);
}
}
}
- public int getPipeConnectorTransferTimeoutMs() {
- return pipeConnectorTransferTimeoutMs;
+ public int getPipeSinkTransferTimeoutMs() {
+ return pipeSinkTransferTimeoutMs;
}
- public void setPipeConnectorTransferTimeoutMs(long
pipeConnectorTransferTimeoutMs) {
- final int fPipeConnectorTransferTimeoutMs =
this.pipeConnectorTransferTimeoutMs;
+ public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) {
+ final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
try {
- this.pipeConnectorTransferTimeoutMs =
Math.toIntExact(pipeConnectorTransferTimeoutMs);
+ this.pipeSinkTransferTimeoutMs =
Math.toIntExact(pipeSinkTransferTimeoutMs);
} catch (ArithmeticException e) {
- this.pipeConnectorTransferTimeoutMs = Integer.MAX_VALUE;
+ this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE;
logger.warn(
"Given pipe connector transfer timeout is too large, set to {} ms.",
Integer.MAX_VALUE);
} finally {
- if (fPipeConnectorTransferTimeoutMs !=
this.pipeConnectorTransferTimeoutMs) {
- logger.info("pipeConnectorTransferTimeoutMs is set to {}.",
pipeConnectorTransferTimeoutMs);
+ if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
+ logger.info("pipeConnectorTransferTimeoutMs is set to {}.",
pipeSinkTransferTimeoutMs);
}
}
}
- public int getPipeConnectorReadFileBufferSize() {
- return pipeConnectorReadFileBufferSize;
+ public int getPipeSinkReadFileBufferSize() {
+ return pipeSinkReadFileBufferSize;
}
- public void setPipeConnectorReadFileBufferSize(int
pipeConnectorReadFileBufferSize) {
- if (this.pipeConnectorReadFileBufferSize ==
pipeConnectorReadFileBufferSize) {
+ public void setPipeSinkReadFileBufferSize(int pipeSinkReadFileBufferSize) {
+ if (this.pipeSinkReadFileBufferSize == pipeSinkReadFileBufferSize) {
return;
}
- this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
- logger.info("pipeConnectorReadFileBufferSize is set to {}.",
pipeConnectorReadFileBufferSize);
+ this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize;
+ logger.info("pipeConnectorReadFileBufferSize is set to {}.",
pipeSinkReadFileBufferSize);
}
- public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
- return isPipeConnectorReadFileBufferMemoryControlEnabled;
+ public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+ return isPipeSinkReadFileBufferMemoryControlEnabled;
}
public void setIsPipeSinkReadFileBufferMemoryControlEnabled(
- boolean isPipeConnectorReadFileBufferMemoryControlEnabled) {
- if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
- == isPipeConnectorReadFileBufferMemoryControlEnabled) {
+ boolean isPipeSinkReadFileBufferMemoryControlEnabled) {
+ if (this.isPipeSinkReadFileBufferMemoryControlEnabled
+ == isPipeSinkReadFileBufferMemoryControlEnabled) {
return;
}
- this.isPipeConnectorReadFileBufferMemoryControlEnabled =
- isPipeConnectorReadFileBufferMemoryControlEnabled;
+ this.isPipeSinkReadFileBufferMemoryControlEnabled =
+ isPipeSinkReadFileBufferMemoryControlEnabled;
logger.info(
- "isPipeConnectorReadFileBufferMemoryControlEnabled is set to {}.",
- isPipeConnectorReadFileBufferMemoryControlEnabled);
+ "isPipeSinkReadFileBufferMemoryControlEnabled is set to {}.",
+ isPipeSinkReadFileBufferMemoryControlEnabled);
}
- public void setPipeSinkRPCThriftCompressionEnabled(
- boolean pipeConnectorRPCThriftCompressionEnabled) {
- if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
- == pipeConnectorRPCThriftCompressionEnabled) {
+ public void setPipeSinkRPCThriftCompressionEnabled(boolean
pipeSinkRPCThriftCompressionEnabled) {
+ if (this.isPipeSinkReadFileBufferMemoryControlEnabled ==
pipeSinkRPCThriftCompressionEnabled) {
return;
}
- this.pipeConnectorRPCThriftCompressionEnabled =
pipeConnectorRPCThriftCompressionEnabled;
+ this.pipeSinkRPCThriftCompressionEnabled =
pipeSinkRPCThriftCompressionEnabled;
logger.info(
- "pipeConnectorRPCThriftCompressionEnabled is set to {}.",
- pipeConnectorRPCThriftCompressionEnabled);
+ "pipeSinkRPCThriftCompressionEnabled is set to {}.",
pipeSinkRPCThriftCompressionEnabled);
}
- public boolean isPipeConnectorRPCThriftCompressionEnabled() {
- return pipeConnectorRPCThriftCompressionEnabled;
+ public boolean isPipeSinkRPCThriftCompressionEnabled() {
+ return pipeSinkRPCThriftCompressionEnabled;
}
public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
@@ -1198,11 +1195,11 @@ public class CommonConfig {
pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall);
}
- public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+ public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
}
- public int getPipeAsyncConnectorSelectorNumber() {
+ public int getPipeAsyncSinkSelectorNumber() {
return pipeAsyncConnectorSelectorNumber;
}
@@ -1220,7 +1217,7 @@ public class CommonConfig {
logger.info("pipeAsyncConnectorSelectorNumber is set to {}.",
pipeAsyncConnectorSelectorNumber);
}
- public int getPipeAsyncConnectorMaxClientNumber() {
+ public int getPipeAsyncSinkMaxClientNumber() {
return pipeAsyncConnectorMaxClientNumber;
}
@@ -1239,7 +1236,7 @@ public class CommonConfig {
"pipeAsyncConnectorMaxClientNumber is set to {}.",
pipeAsyncConnectorMaxClientNumber);
}
- public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+ public int getPipeAsyncSinkMaxTsFileClientNumber() {
return pipeAsyncConnectorMaxTsFileClientNumber;
}
@@ -1372,6 +1369,17 @@ public class CommonConfig {
return pipeSinkRetryLocallyForConnectionError;
}
+ public void setPipeSinkRetryLocallyForConnectionError(
+ boolean pipeSinkRetryLocallyForConnectionError) {
+ if (this.pipeSinkRetryLocallyForConnectionError ==
pipeSinkRetryLocallyForConnectionError) {
+ return;
+ }
+ this.pipeSinkRetryLocallyForConnectionError =
pipeSinkRetryLocallyForConnectionError;
+ logger.info(
+ "pipeSinkRetryLocallyForConnectionError is set to {}",
+ pipeSinkRetryLocallyForConnectionError);
+ }
+
public int
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
return pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount;
}
@@ -2076,7 +2084,7 @@ public class CommonConfig {
"rateLimiterHotReloadCheckIntervalMs is set to {}",
rateLimiterHotReloadCheckIntervalMs);
}
- public int getPipeConnectorRequestSliceThresholdBytes() {
+ public int getPipeSinkRequestSliceThresholdBytes() {
return pipeConnectorRequestSliceThresholdBytes;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index c57a1fa9b33..760d3c2ffeb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -159,7 +159,8 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
throwable);
int retry = 0;
- while (retry < MAX_RETRY_TIMES) {
+ while (retry < MAX_RETRY_TIMES
+ ||
PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
try {
outputPipeSink.handshake();
LOGGER.info(
@@ -170,15 +171,16 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
retry++;
LOGGER.warn(
"{} failed to handshake with the target system for {} times, "
- + "will retry at most {} times.",
+ +
(PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()
+ ? "will retry forever."
+ : String.format("will retry at most %s times.",
MAX_RETRY_TIMES)),
outputPipeSink.getClass().getName(),
retry,
- MAX_RETRY_TIMES,
e);
try {
sleepIfNoHighPriorityTask(
Math.min(retry, MAX_RETRY_TIMES)
- *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+ * PipeConfig.getInstance().getPipeSinkRetryIntervalMs());
} catch (final InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to handshake with the
target system.",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index ca43b58e368..69918b59fac 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -103,7 +103,7 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeMinimumReceiverMemory();
}
- /////////////////////////////// Subtask Connector
///////////////////////////////
+ /////////////////////////////// Subtask Sink ///////////////////////////////
public int getPipeRealTimeQueuePollTsFileThreshold() {
return COMMON_CONFIG.getPipeRealTimeQueuePollTsFileThreshold();
@@ -165,30 +165,34 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeSourceMatcherCacheSize();
}
- /////////////////////////////// Connector ///////////////////////////////
+ /////////////////////////////// Sink ///////////////////////////////
- public int getPipeConnectorHandshakeTimeoutMs() {
- return COMMON_CONFIG.getPipeConnectorHandshakeTimeoutMs();
+ public int getPipeSinkHandshakeTimeoutMs() {
+ return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
}
- public int getPipeConnectorTransferTimeoutMs() {
- return COMMON_CONFIG.getPipeConnectorTransferTimeoutMs();
+ public int getPipeSinkTransferTimeoutMs() {
+ return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
}
- public int getPipeConnectorReadFileBufferSize() {
- return COMMON_CONFIG.getPipeConnectorReadFileBufferSize();
+ public int getPipeSinkReadFileBufferSize() {
+ return COMMON_CONFIG.getPipeSinkReadFileBufferSize();
}
- public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
- return COMMON_CONFIG.isPipeConnectorReadFileBufferMemoryControlEnabled();
+ public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+ return COMMON_CONFIG.isPipeSinkReadFileBufferMemoryControlEnabled();
}
- public long getPipeConnectorRetryIntervalMs() {
+ public long getPipeSinkRetryIntervalMs() {
return COMMON_CONFIG.getPipeSinkRetryIntervalMs();
}
- public boolean isPipeConnectorRPCThriftCompressionEnabled() {
- return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
+ public boolean isPipeSinkRetryLocallyForConnectionError() {
+ return COMMON_CONFIG.isPipeSinkRetryLocallyForConnectionError();
+ }
+
+ public boolean isPipeSinkRPCThriftCompressionEnabled() {
+ return COMMON_CONFIG.isPipeSinkRPCThriftCompressionEnabled();
}
public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
@@ -203,27 +207,27 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
}
- public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
- return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
+ public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
+ return COMMON_CONFIG.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall();
}
- public int getPipeAsyncConnectorSelectorNumber() {
- return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
+ public int getPipeAsyncSinkSelectorNumber() {
+ return COMMON_CONFIG.getPipeAsyncSinkSelectorNumber();
}
- public int getPipeAsyncConnectorMaxClientNumber() {
- return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
+ public int getPipeAsyncSinkMaxClientNumber() {
+ return COMMON_CONFIG.getPipeAsyncSinkMaxClientNumber();
}
- public int getPipeAsyncConnectorMaxTsFileClientNumber() {
- return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber();
+ public int getPipeAsyncSinkMaxTsFileClientNumber() {
+ return COMMON_CONFIG.getPipeAsyncSinkMaxTsFileClientNumber();
}
public double getPipeSendTsFileRateLimitBytesPerSecond() {
return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond();
}
- public double getPipeAllConnectorsRateLimitBytesPerSecond() {
+ public double getPipeAllSinksRateLimitBytesPerSecond() {
return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
}
@@ -231,8 +235,8 @@ public class PipeConfig {
return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs();
}
- public int getPipeConnectorRequestSliceThresholdBytes() {
- return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
+ public int getPipeSinkRequestSliceThresholdBytes() {
+ return COMMON_CONFIG.getPipeSinkRequestSliceThresholdBytes();
}
public long getPipeMaxReaderChunkSize() {
@@ -503,16 +507,14 @@ public class PipeConfig {
getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
LOGGER.info("PipeSourceMatcherCacheSize: {}",
getPipeSourceMatcherCacheSize());
- LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}",
getPipeConnectorHandshakeTimeoutMs());
- LOGGER.info("PipeConnectorTransferTimeoutMs: {}",
getPipeConnectorTransferTimeoutMs());
- LOGGER.info("PipeConnectorReadFileBufferSize: {}",
getPipeConnectorReadFileBufferSize());
- LOGGER.info(
- "PipeConnectorReadFileBufferMemoryControlEnabled: {}",
- isPipeConnectorReadFileBufferMemoryControlEnabled());
- LOGGER.info("PipeConnectorRetryIntervalMs: {}",
getPipeConnectorRetryIntervalMs());
+ LOGGER.info("PipeSinkHandshakeTimeoutMs: {}",
getPipeSinkHandshakeTimeoutMs());
+ LOGGER.info("PipeSinkTransferTimeoutMs: {}",
getPipeSinkTransferTimeoutMs());
+ LOGGER.info("PipeSinkReadFileBufferSize: {}",
getPipeSinkReadFileBufferSize());
LOGGER.info(
- "PipeConnectorRPCThriftCompressionEnabled: {}",
- isPipeConnectorRPCThriftCompressionEnabled());
+ "PipeSinkReadFileBufferMemoryControlEnabled: {}",
+ isPipeSinkReadFileBufferMemoryControlEnabled());
+ LOGGER.info("PipeSinkRetryIntervalMs: {}", getPipeSinkRetryIntervalMs());
+ LOGGER.info("PipeSinkRPCThriftCompressionEnabled: {}",
isPipeSinkRPCThriftCompressionEnabled());
LOGGER.info(
"PipeLeaderCacheMemoryUsagePercentage: {}",
getPipeLeaderCacheMemoryUsagePercentage());
LOGGER.info("PipeMaxAlignedSeriesChunkSizeInOneBatch: {}",
getPipeMaxReaderChunkSize());
@@ -550,34 +552,29 @@ public class PipeConfig {
getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold());
LOGGER.info(
- "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
+ "PipeAsyncSinkForcedRetryTsFileEventQueueSizeThreshold: {}",
getPipeAsyncSinkForcedRetryTsFileEventQueueSize());
LOGGER.info(
- "PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
+ "PipeAsyncSinkForcedRetryTabletEventQueueSizeThreshold: {}",
getPipeAsyncSinkForcedRetryTabletEventQueueSize());
LOGGER.info(
- "PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
+ "PipeAsyncSinkForcedRetryTotalEventQueueSizeThreshold: {}",
getPipeAsyncSinkForcedRetryTotalEventQueueSize());
LOGGER.info(
- "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
- getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
- LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
- LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
- LOGGER.info(
- "PipeAsyncConnectorMaxTsFileClientNumber: {}",
- getPipeAsyncConnectorMaxTsFileClientNumber());
+ "PipeAsyncSinkMaxRetryExecutionTimeMsPerCall: {}",
+ getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall());
+ LOGGER.info("PipeAsyncSinkSelectorNumber: {}",
getPipeAsyncSinkSelectorNumber());
+ LOGGER.info("PipeAsyncSinkMaxClientNumber: {}",
getPipeAsyncSinkMaxClientNumber());
+ LOGGER.info("PipeAsyncSinkMaxTsFileClientNumber: {}",
getPipeAsyncSinkMaxTsFileClientNumber());
LOGGER.info(
"PipeSendTsFileRateLimitBytesPerSecond: {}",
getPipeSendTsFileRateLimitBytesPerSecond());
LOGGER.info(
- "PipeAllConnectorsRateLimitBytesPerSecond: {}",
- getPipeAllConnectorsRateLimitBytesPerSecond());
+ "PipeAllSinksRateLimitBytesPerSecond: {}",
getPipeAllSinksRateLimitBytesPerSecond());
LOGGER.info(
"RateLimiterHotReloadCheckIntervalMs: {}",
getRateLimiterHotReloadCheckIntervalMs());
- LOGGER.info(
- "PipeConnectorRequestSliceThresholdBytes: {}",
- getPipeConnectorRequestSliceThresholdBytes());
+ LOGGER.info("PipeSinkRequestSliceThresholdBytes: {}",
getPipeSinkRequestSliceThresholdBytes());
LOGGER.info("SeperatedPipeHeartbeatEnabled: {}",
isSeperatedPipeHeartbeatEnabled());
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 8a18650b0bb..f4ed2cbd9a7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -334,28 +334,27 @@ public class PipeDescriptor {
"pipe_extractor_matcher_cache_size",
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
- config.setPipeConnectorHandshakeTimeoutMs(
+ config.setPipeSinkHandshakeTimeoutMs(
Long.parseLong(
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))
.orElse(
properties.getProperty(
"pipe_connector_handshake_timeout_ms",
-
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs())))));
- config.setPipeConnectorReadFileBufferSize(
+
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
+ config.setPipeSinkReadFileBufferSize(
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
.orElse(
properties.getProperty(
"pipe_connector_read_file_buffer_size",
-
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
+
String.valueOf(config.getPipeSinkReadFileBufferSize())))));
config.setIsPipeSinkReadFileBufferMemoryControlEnabled(
Boolean.parseBoolean(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_memory_control"))
.orElse(
properties.getProperty(
"pipe_connector_read_file_buffer_memory_control",
- String.valueOf(
-
config.isPipeConnectorReadFileBufferMemoryControlEnabled())))));
+
String.valueOf(config.isPipeSinkReadFileBufferMemoryControlEnabled())))));
config.setPipeSinkRetryIntervalMs(
Long.parseLong(
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
@@ -363,13 +362,21 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_connector_retry_interval_ms",
String.valueOf(config.getPipeSinkRetryIntervalMs())))));
+ config.setPipeSinkRetryLocallyForConnectionError(
+ Boolean.parseBoolean(
+ Optional.ofNullable(
+
properties.getProperty("pipe_sink_retry_locally_for_connection_error"))
+ .orElse(
+ properties.getProperty(
+ "pipe_connector_retry_locally_for_connection_error",
+
String.valueOf(config.isPipeSinkRetryLocallyForConnectionError())))));
config.setPipeSinkRPCThriftCompressionEnabled(
Boolean.parseBoolean(
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
.orElse(
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
-
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
+
String.valueOf(config.isPipeSinkRPCThriftCompressionEnabled())))));
config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
Long.parseLong(
Optional.ofNullable(
@@ -377,8 +384,7 @@ public class PipeDescriptor {
.orElse(
properties.getProperty(
"pipe_async_connector_max_retry_execution_time_ms_per_call",
- String.valueOf(
-
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
+
String.valueOf(config.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall())))));
config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
Integer.parseInt(
Optional.ofNullable(
@@ -415,7 +421,7 @@ public class PipeDescriptor {
Integer.parseInt(
properties.getProperty(
"pipe_connector_request_slice_threshold_bytes",
-
String.valueOf(config.getPipeConnectorRequestSliceThresholdBytes()))));
+
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes()))));
config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
Long.parseLong(
@@ -565,7 +571,7 @@ public class PipeDescriptor {
parserPipeConfig(
properties, "pipe_sink_timeout_ms", "pipe_connector_timeout_ms",
isHotModify);
if (value != null) {
- config.setPipeConnectorTransferTimeoutMs(Long.parseLong(value));
+ config.setPipeSinkTransferTimeoutMs(Long.parseLong(value));
}
value =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
index 3e009b279d7..1f76f5d2453 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
@@ -58,7 +58,7 @@ public abstract class IoTDBClientManager {
private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; //
1 day
private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000;
// 6 hours
protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
- new
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+ new
AtomicInteger(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
protected IoTDBClientManager(
final List<TEndPoint> endPointList,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
index a0bb65b5a7e..b7f42295e6c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
@@ -100,7 +100,7 @@ public class IoTDBSyncClient extends
IClientRPCService.Client
@Override
public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws
TException {
- final int bodySizeLimit =
PipeConfig.getInstance().getPipeConnectorRequestSliceThresholdBytes();
+ final int bodySizeLimit =
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
|| req.body.limit() < bodySizeLimit) {
return super.pipeTransfer(req);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index 4c5d51f83c5..76c145d0dab 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -198,9 +198,9 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
clientAndStatus.setLeft(
new IoTDBSyncClient(
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
- PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+ PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
.build(),
endPoint.getIp(),
endPoint.getPort(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
index 9a6aba5b90b..e497656eab2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
@@ -28,6 +28,6 @@ public class GlobalRPCRateLimiter extends GlobalRateLimiter {
@Override
protected double getThroughputBytesPerSecond() {
- return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
+ return CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index aba6c7e1d82..7d84e3bb98f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -242,7 +242,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
} else {
supportModsIfIsDataNodeReceiver = true;
}
- socket.setSoTimeout(PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
+ socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
LOGGER.info("Handshake success. Socket: {}", socket);
}
@@ -269,7 +269,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
final AirGapSocket socket,
final boolean isMultiFile)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
index b6b8e52f1fa..75a4607a23b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
@@ -184,7 +184,7 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink {
final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
final boolean isMultiFile)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {