This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch squash-dbgw-master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/squash-dbgw-master by this
push:
new 56c2dfaef30 Rate limiter: For squashing (#15862)
56c2dfaef30 is described below
commit 56c2dfaef3001913f278db4eb7ddc3556d770934
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 3 12:36:02 2025 +0800
Rate limiter: For squashing (#15862)
---
.../protocol/IoTDBConfigRegionAirGapConnector.java | 5 ++
.../protocol/IoTDBConfigRegionConnector.java | 5 ++
.../task/builder/PipeDataNodeTaskBuilder.java | 68 ++++++++++++++--------
.../airgap/IoTDBDataRegionAirGapConnector.java | 31 ++++++++++
.../airgap/IoTDBSchemaRegionAirGapConnector.java | 5 ++
.../async/IoTDBDataRegionAsyncConnector.java | 14 +++++
.../async/handler/PipeTransferTsFileHandler.java | 6 ++
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 21 +++++++
.../thrift/sync/IoTDBSchemaRegionConnector.java | 5 ++
.../dataregion/IoTDBDataRegionExtractor.java | 10 +++-
.../pipe/metric/overview/PipeResourceMetrics.java | 14 +++++
.../load/limiter/LoadTsFileRateLimiter.java | 64 +++-----------------
.../apache/iotdb/commons/conf/CommonConfig.java | 16 +++++
.../iotdb/commons/pipe/config/PipeConfig.java | 6 ++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 6 ++
.../config/constant/PipeConnectorConstant.java | 5 ++
.../connector/limiter/GlobalRPCRateLimiter.java | 33 +++++++++++
.../pipe/connector/limiter/GlobalRateLimiter.java | 14 ++---
.../connector/limiter/TsFileSendRateLimiter.java | 47 +++++++++++++++
.../connector/protocol/IoTDBAirGapConnector.java | 3 +
.../pipe/connector/protocol/IoTDBConnector.java | 4 +-
.../connector/protocol/IoTDBSslSyncConnector.java | 3 +
.../iotdb/commons/service/metric/enums/Metric.java | 1 +
23 files changed, 294 insertions(+), 92 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index ab3f2707544..9e9976bc3dc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -89,6 +89,11 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
+ @Override
+ protected void mayLimitRateAndRecordIO(final long requiredBytes) {
+ // Do nothing
+ }
+
@Override
protected boolean mayNeedHandshakeWhenFail() {
return true;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 4453cf4e063..c3a1c0dd522 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -103,6 +103,11 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
return PipeTransferConfigSnapshotPieceReq.toTPipeTransferReq(fileName,
position, payLoad);
}
+ @Override
+ protected void mayLimitRateAndRecordIO(final long requiredBytes) {
+ // Do nothing
+ }
+
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
throw new UnsupportedOperationException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 30c48ef5b7e..0df7f1c133b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -58,8 +58,11 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_SNAPSHOT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
public class PipeDataNodeTaskBuilder {
@@ -195,10 +198,6 @@ public class PipeDataNodeTaskBuilder {
||
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
}
- if (!insertionDeletionListeningOptionPair.right
- && !shouldTerminatePipeOnAllHistoricalEventsConsumed) {
- return;
- }
} catch (final IllegalPathException e) {
LOGGER.warn(
"PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion'
parameters: {}",
@@ -207,29 +206,52 @@ public class PipeDataNodeTaskBuilder {
return;
}
- final Boolean isRealtime =
- connectorParameters.getBooleanByKeys(
- PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
- PipeConnectorConstant.SINK_REALTIME_FIRST_KEY);
- if (isRealtime == null) {
-
connectorParameters.addAttribute(PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
"false");
- if (insertionDeletionListeningOptionPair.right) {
- LOGGER.info(
- "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete',
'realtime-first' is defaulted to 'false' to prevent sync issues after
deletion.");
- } else {
- LOGGER.info(
- "PipeDataNodeTaskBuilder: When extractor uses snapshot model,
'realtime-first' is defaulted to 'false' to prevent premature halt before
transfer completion.");
+ if (insertionDeletionListeningOptionPair.right
+ || shouldTerminatePipeOnAllHistoricalEventsConsumed) {
+ final Boolean isRealtime =
+ connectorParameters.getBooleanByKeys(
+ PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
+ PipeConnectorConstant.SINK_REALTIME_FIRST_KEY);
+ if (isRealtime == null) {
+ connectorParameters.addAttribute(
+ PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY, "false");
+ if (insertionDeletionListeningOptionPair.right) {
+ LOGGER.info(
+ "PipeDataNodeTaskBuilder: When 'inclusion' contains
'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues
after deletion.");
+ } else {
+ LOGGER.info(
+ "PipeDataNodeTaskBuilder: When extractor uses snapshot model,
'realtime-first' is defaulted to 'false' to prevent premature halt before
transfer completion.");
+ }
+ } else if (isRealtime) {
+ if (insertionDeletionListeningOptionPair.right) {
+ LOGGER.warn(
+ "PipeDataNodeTaskBuilder: When 'inclusion' includes
'data.delete', 'realtime-first' set to 'true' may result in data
synchronization issues after deletion.");
+ } else {
+ LOGGER.warn(
+ "PipeDataNodeTaskBuilder: When extractor uses snapshot model,
'realtime-first' set to 'true' may cause prevent premature halt before transfer
completion.");
+ }
}
- return;
}
- if (isRealtime) {
- if (insertionDeletionListeningOptionPair.right) {
- LOGGER.warn(
- "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete',
'realtime-first' set to 'true' may result in data synchronization issues after
deletion.");
- } else {
+ final boolean isRealtimeEnabled =
+ extractorParameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
+ EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
+
+ if (isRealtimeEnabled &&
!shouldTerminatePipeOnAllHistoricalEventsConsumed) {
+ final Boolean enableSendTsFileLimit =
+ connectorParameters.getBooleanByKeys(
+ PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
+ PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);
+
+ if (enableSendTsFileLimit == null) {
+ connectorParameters.addAttribute(
+ PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true");
+ LOGGER.info(
+ "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we
enable rate limiter in sending tsfile by default to reserve disk and network IO
for realtime sending.");
+ } else if (!enableSendTsFileLimit) {
LOGGER.warn(
- "PipeDataNodeTaskBuilder: When extractor uses snapshot model,
'realtime-first' set to 'true' may cause prevent premature halt before transfer
completion.");
+ "PipeDataNodeTaskBuilder: When the realtime sync is enabled, not
enabling the rate limiter in sending tsfile may introduce delay for realtime
sending.");
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 009a7b723ac..4e81944a057 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.connector.protocol.airgap;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
@@ -34,11 +35,14 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -51,8 +55,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Objects;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;
+
@TreeModel
@TableModel
public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector {
@@ -60,6 +69,20 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataRegionAirGapConnector.class);
+ private boolean enableSendTsFileLimit;
+
+ @Override
+ public void customize(
+ final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration)
+ throws Exception {
+ super.customize(parameters, configuration);
+
+ enableSendTsFileLimit =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT,
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
+ CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
+ }
+
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
// PipeProcessor can change the type of TabletInsertionEvent
@@ -356,6 +379,14 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
}
}
+ @Override
+ protected void mayLimitRateAndRecordIO(final long requiredBytes) {
+ PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes);
+ if (enableSendTsFileLimit) {
+ TsFileSendRateLimiter.getInstance().acquire(requiredBytes);
+ }
+ }
+
@Override
protected byte[] getTransferSingleFilePieceBytes(
final String fileName, final long position, final byte[] payLoad) throws
IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
index 22a127905ef..d6716935695 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
@@ -198,6 +198,11 @@ public class IoTDBSchemaRegionAirGapConnector extends
IoTDBDataNodeAirGapConnect
}
}
+ @Override
+ protected void mayLimitRateAndRecordIO(final long requiredBytes) {
+ // Do nothing
+ }
+
@Override
protected byte[] getTransferSingleFilePieceBytes(
final String fileName, final long position, final byte[] payLoad) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index afac1c5c742..3b7a0f862c9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -82,8 +82,11 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
@@ -123,6 +126,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private final Map<PipeTransferTrackableHandler,
PipeTransferTrackableHandler> pendingHandlers =
new ConcurrentHashMap<>();
+ private boolean enableSendTsFileLimit;
+
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
super.validate(validator);
@@ -178,6 +183,11 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
}
+
+ enableSendTsFileLimit =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT,
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
+ CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
}
@Override
@@ -686,6 +696,10 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
events.forEach(this::addFailureEventToRetryQueue);
}
+ public boolean isEnableSendTsFileLimit() {
+ return enableSendTsFileLimit;
+ }
+
//////////////////////////// Operations for close
////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index b6dcf194732..d43a9799116 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.RetryUtils;
@@ -31,6 +32,7 @@ import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock;
@@ -162,6 +164,10 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
client.setShouldReturnSelf(false);
client.setTimeoutDynamically(clientManager.getConnectionTimeout());
+ PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize);
+ if (connector.isEnableSendTsFileLimit()) {
+ TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize);
+ }
final int readLength = reader.read(readBuffer);
if (readLength == -1) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 47eb0a5c63c..5d222fd79ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
+import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.RetryUtils;
@@ -43,6 +44,7 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.annotation.TableModel;
@@ -67,11 +69,16 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;
+
@TreeModel
@TableModel
public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
@@ -79,6 +86,7 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataRegionSyncConnector.class);
private PipeTransferBatchReqBuilder tabletBatchBuilder;
+ private boolean enableSendTsFileLimit;
@Override
public void customize(
@@ -90,6 +98,11 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
}
+
+ enableSendTsFileLimit =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT,
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
+ CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
}
@Override
@@ -104,6 +117,14 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName,
position, payLoad);
}
+ @Override
+ protected void mayLimitRateAndRecordIO(final long requiredBytes) {
+ PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes);
+ if (enableSendTsFileLimit) {
+ TsFileSendRateLimiter.getInstance().acquire(requiredBytes);
+ }
+ }
+
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
// PipeProcessor can change the type of TabletInsertionEvent
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
index 7ce697e905a..4c19afb026a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
@@ -244,4 +244,9 @@ public class IoTDBSchemaRegionConnector extends
IoTDBDataNodeSyncConnector {
final String fileName, final long position, final byte[] payLoad) throws
IOException {
return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferReq(fileName,
position, payLoad);
}
+
+ @Override
+ protected void mayLimitRateAndRecordIO(final long requiredBytes) {
+ // Do nothing
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index 7c85881ff93..3ee2d655ec9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -341,15 +341,19 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
EXTRACTOR_END_TIME_KEY)
&& parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_ENABLE_KEY,
- EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY,
- SOURCE_REALTIME_ENABLE_KEY)) {
+ SOURCE_HISTORY_START_TIME_KEY,
+ EXTRACTOR_HISTORY_START_TIME_KEY,
+ SOURCE_HISTORY_END_TIME_KEY,
+ EXTRACTOR_HISTORY_END_TIME_KEY)) {
LOGGER.warn(
- "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is
invalid.",
+ "When {}, {}, {} or {} is specified, specifying {}, {}, {}, {}, {}
and {} is invalid.",
SOURCE_START_TIME_KEY,
EXTRACTOR_START_TIME_KEY,
SOURCE_END_TIME_KEY,
EXTRACTOR_END_TIME_KEY,
+ SOURCE_HISTORY_ENABLE_KEY,
+ EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_START_TIME_KEY,
EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
index 9eb71b37124..7d9b172cc92 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
@@ -27,7 +27,9 @@ import
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
@@ -41,6 +43,8 @@ public class PipeResourceMetrics implements IMetricSet {
private static final String PIPE_TOTAL_MEMORY = "PipeTotalMemory";
+ private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@Override
@@ -96,6 +100,10 @@ public class PipeResourceMetrics implements IMetricSet {
MetricLevel.IMPORTANT,
PipeDataNodeResourceManager.ref(),
PipePhantomReferenceManager::getPhantomReferenceCount);
+ // tsFile send rate
+ diskIOCounter =
+ metricService.getOrCreateCounter(
+ Metric.PIPE_TSFILE_SEND_DISK_IO.toString(), MetricLevel.IMPORTANT);
}
@Override
@@ -121,6 +129,12 @@ public class PipeResourceMetrics implements IMetricSet {
metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_LINKED_TSFILE_SIZE.toString());
// phantom reference count
metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
+
+ metricService.remove(MetricType.RATE,
Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
+ }
+
+ public void recordDiskIO(final long bytes) {
+ diskIOCounter.inc(bytes);
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
index eaaf376a819..3be223287a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
@@ -19,78 +19,28 @@
package org.apache.iotdb.db.storageengine.load.limiter;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRateLimiter;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
-import com.google.common.util.concurrent.AtomicDouble;
-import com.google.common.util.concurrent.RateLimiter;
-
-import java.util.concurrent.TimeUnit;
-
-public class LoadTsFileRateLimiter {
+public class LoadTsFileRateLimiter extends GlobalRateLimiter {
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private final AtomicDouble throughputBytesPerSecond =
- new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond());
- private final RateLimiter loadWriteRateLimiter;
-
+ @Override
public void acquire(long bytes) {
LoadTsFileCostMetricsSet.getInstance().recordDiskIO(bytes);
-
- if (reloadParams()) {
- return;
- }
-
- while (bytes > 0) {
- if (bytes > Integer.MAX_VALUE) {
- tryAcquireWithRateCheck(Integer.MAX_VALUE);
- bytes -= Integer.MAX_VALUE;
- } else {
- tryAcquireWithRateCheck((int) bytes);
- return;
- }
- }
+ super.acquire(bytes);
}
- private void tryAcquireWithRateCheck(final int bytes) {
- while (!loadWriteRateLimiter.tryAcquire(
- bytes,
- PipeConfig.getInstance().getRateLimiterHotReloadCheckIntervalMs(),
- TimeUnit.MILLISECONDS)) {
- if (reloadParams()) {
- return;
- }
- }
- }
-
- private boolean reloadParams() {
- final double throughputBytesPerSecondLimit =
CONFIG.getLoadWriteThroughputBytesPerSecond();
-
- if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) {
- throughputBytesPerSecond.set(throughputBytesPerSecondLimit);
- loadWriteRateLimiter.setRate(
- // if throughput <= 0, disable rate limiting
- throughputBytesPerSecondLimit <= 0 ? Double.MAX_VALUE :
throughputBytesPerSecondLimit);
- }
-
- // For performance, we don't need to acquire rate limiter if throughput <= 0
- return throughputBytesPerSecondLimit <= 0;
+ @Override
+ protected double getThroughputBytesPerSecond() {
+ return CONFIG.getLoadWriteThroughputBytesPerSecond();
}
//////////////////////////// Singleton ////////////////////////////
- private LoadTsFileRateLimiter() {
- final double throughputBytesPerSecondLimit =
throughputBytesPerSecond.get();
- loadWriteRateLimiter =
- // if throughput <= 0, disable rate limiting
- throughputBytesPerSecondLimit <= 0
- ? RateLimiter.create(Double.MAX_VALUE)
- : RateLimiter.create(throughputBytesPerSecondLimit);
- }
-
private static class LoadTsFileRateLimiterHolder {
private static final LoadTsFileRateLimiter INSTANCE = new
LoadTsFileRateLimiter();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 66a62f0b9ed..5d79b138067 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -250,6 +250,7 @@ public class CommonConfig {
private int pipeAsyncConnectorMaxTsFileClientNumber =
Math.max(16, Runtime.getRuntime().availableProcessors());
+ private double pipeSendTsFileRateLimitBytesPerSecond = 32 * MB;
private double pipeAllSinksRateLimitBytesPerSecond = -1;
private int rateLimiterHotReloadCheckIntervalMs = 1000;
@@ -2040,6 +2041,21 @@ public class CommonConfig {
logger.info("pipeTransferTsFileSync is set to {}", pipeTransferTsFileSync);
}
+ public double getPipeSendTsFileRateLimitBytesPerSecond() {
+ return pipeSendTsFileRateLimitBytesPerSecond;
+ }
+
+ public void setPipeSendTsFileRateLimitBytesPerSecond(
+ double pipeSendTsFileRateLimitBytesPerSecond) {
+ if (this.pipeSendTsFileRateLimitBytesPerSecond ==
pipeSendTsFileRateLimitBytesPerSecond) {
+ return;
+ }
+ this.pipeSendTsFileRateLimitBytesPerSecond =
pipeSendTsFileRateLimitBytesPerSecond;
+ logger.info(
+ "pipeSendTsFileRateLimitBytesPerSecond is set to {}",
+ pipeSendTsFileRateLimitBytesPerSecond);
+ }
+
public double getPipeAllSinksRateLimitBytesPerSecond() {
return pipeAllSinksRateLimitBytesPerSecond;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 07b25b0f0f2..dc2b4350960 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -215,6 +215,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber();
}
+ public double getPipeSendTsFileRateLimitBytesPerSecond() {
+ return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond();
+ }
+
public double getPipeAllConnectorsRateLimitBytesPerSecond() {
return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
}
@@ -586,6 +590,8 @@ public class PipeConfig {
"PipeAsyncConnectorMaxTsFileClientNumber: {}",
getPipeAsyncConnectorMaxTsFileClientNumber());
+ LOGGER.info(
+ "PipeSendTsFileRateLimitBytesPerSecond: {}",
getPipeSendTsFileRateLimitBytesPerSecond());
LOGGER.info(
"PipeAllConnectorsRateLimitBytesPerSecond: {}",
getPipeAllConnectorsRateLimitBytesPerSecond());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 2feecc0ee56..b754a8581aa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -637,6 +637,12 @@ public class PipeDescriptor {
config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value));
}
+ value =
+ parserPipeConfig(properties,
"pipe_send_tsfile_rate_limit_bytes_per_second", isHotModify);
+ if (value != null) {
+
config.setPipeSendTsFileRateLimitBytesPerSecond(Double.parseDouble(value));
+ }
+
value = parserPipeConfig(properties,
"pipe_all_sinks_rate_limit_bytes_per_second", isHotModify);
if (value != null) {
config.setPipeAllSinksRateLimitBytesPerSecond(Double.parseDouble(value));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index f0934d86d35..cd540cfd226 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -220,6 +220,11 @@ public class PipeConnectorConstant {
public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE =
Zstd.minCompressionLevel();
public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE =
Zstd.maxCompressionLevel();
+ public static final String CONNECTOR_ENABLE_SEND_TSFILE_LIMIT =
+ "connector.enable-send-tsfile-limit";
+ public static final String SINK_ENABLE_SEND_TSFILE_LIMIT =
"sink.enable-send-tsfile-limit";
+ public static final boolean CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE
= false;
+
public static final String CONNECTOR_RATE_LIMIT_KEY =
"connector.rate-limit-bytes-per-second";
public static final String SINK_RATE_LIMIT_KEY =
"sink.rate-limit-bytes-per-second";
public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRPCRateLimiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRPCRateLimiter.java
new file mode 100644
index 00000000000..ee450d16330
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRPCRateLimiter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.limiter;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
+/** This is a global rate limiter for all connectors. */
+public class GlobalRPCRateLimiter extends GlobalRateLimiter {
+
+ private static final PipeConfig CONFIG = PipeConfig.getInstance();
+
+ @Override
+ protected double getThroughputBytesPerSecond() {
+ return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
index 08190eff500..bb583901796 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
@@ -26,18 +26,17 @@ import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
-/** This is a global rate limiter for all connectors. */
-public class GlobalRateLimiter {
-
- private static final PipeConfig CONFIG = PipeConfig.getInstance();
+public abstract class GlobalRateLimiter {
private final AtomicDouble throughputBytesPerSecond =
- new AtomicDouble(CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond());
+ new AtomicDouble(getThroughputBytesPerSecond());
+
private final RateLimiter rateLimiter;
public GlobalRateLimiter() {
final double throughputBytesPerSecondLimit =
throughputBytesPerSecond.get();
rateLimiter =
+ // if throughput <= 0, disable rate limiting
throughputBytesPerSecondLimit <= 0
? RateLimiter.create(Double.MAX_VALUE)
: RateLimiter.create(throughputBytesPerSecondLimit);
@@ -71,8 +70,7 @@ public class GlobalRateLimiter {
}
private boolean reloadParams() {
- final double throughputBytesPerSecondLimit =
- CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
+ final double throughputBytesPerSecondLimit = getThroughputBytesPerSecond();
if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) {
throughputBytesPerSecond.set(throughputBytesPerSecondLimit);
@@ -84,4 +82,6 @@ public class GlobalRateLimiter {
// For performance, we don't need to acquire rate limiter if throughput <= 0
return throughputBytesPerSecondLimit <= 0;
}
+
+ protected abstract double getThroughputBytesPerSecond();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/TsFileSendRateLimiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/TsFileSendRateLimiter.java
new file mode 100644
index 00000000000..ff5e4e111b3
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/TsFileSendRateLimiter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.limiter;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
+public class TsFileSendRateLimiter extends GlobalRateLimiter {
+
+ private static final PipeConfig CONFIG = PipeConfig.getInstance();
+
+ @Override
+ protected double getThroughputBytesPerSecond() {
+ return CONFIG.getPipeSendTsFileRateLimitBytesPerSecond();
+ }
+
+ //////////////////////////// Singleton ////////////////////////////
+
+ private static class TsFileSendRateLimiterHolder {
+
+ private static final TsFileSendRateLimiter INSTANCE = new
TsFileSendRateLimiter();
+
+ private TsFileSendRateLimiterHolder() {
+ // Prevent instantiation
+ }
+ }
+
+ public static TsFileSendRateLimiter getInstance() {
+ return TsFileSendRateLimiter.TsFileSendRateLimiterHolder.INSTANCE;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 9629bcc7806..7c396db352b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -263,6 +263,7 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
while (true) {
+ mayLimitRateAndRecordIO(readFileBufferSize);
final int readLength = reader.read(readBuffer);
if (readLength == -1) {
break;
@@ -298,6 +299,8 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
}
}
+ protected abstract void mayLimitRateAndRecordIO(final long requiredBytes);
+
protected abstract boolean mayNeedHandshakeWhenFail();
protected abstract byte[] getTransferSingleFilePieceBytes(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 8a434dc888f..24e5f28b328 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -24,7 +24,7 @@ import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeE
import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorConfig;
import
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
-import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRateLimiter;
+import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRPCRateLimiter;
import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
@@ -168,7 +168,7 @@ public abstract class IoTDBConnector implements
PipeConnector {
private static final Map<Pair<String, Long>, PipeEndPointRateLimiter>
PIPE_END_POINT_RATE_LIMITER_MAP = new ConcurrentHashMap<>();
private double endPointRateLimitBytesPerSecond = -1;
- private static final GlobalRateLimiter GLOBAL_RATE_LIMITER = new
GlobalRateLimiter();
+ private static final GlobalRPCRateLimiter GLOBAL_RATE_LIMITER = new
GlobalRPCRateLimiter();
protected boolean isTabletBatchModeEnabled = true;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 578ae675db4..eeb32ee5810 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -179,6 +179,7 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
while (true) {
+ mayLimitRateAndRecordIO(readFileBufferSize);
final int readLength = reader.read(readBuffer);
if (readLength == -1) {
break;
@@ -248,6 +249,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
protected abstract PipeTransferFilePieceReq getTransferMultiFilePieceReq(
final String fileName, final long position, final byte[] payLoad) throws
IOException;
+ protected abstract void mayLimitRateAndRecordIO(final long requiredBytes);
+
@Override
public void close() {
if (clientManager != null) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 17444b3e5ac..aa15f516bfa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -171,6 +171,7 @@ public enum Metric {
PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"),
PIPE_LINKED_TSFILE_SIZE("pipe_linked_tsfile_size"),
PIPE_PHANTOM_REFERENCE_COUNT("pipe_phantom_reference_count"),
+ PIPE_TSFILE_SEND_DISK_IO("pipe_tsfile_send_disk_io"),
PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE("pipe_async_connector_retry_event_queue_size"),
PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"),
PIPE_PROCEDURE("pipe_procedure"),