This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e3aa4d8954c Pipe: RPC compression for data sync connectors (SNAPPY,
GZIP, LZ4, ZSTD and LZMA2) (#12519)
e3aa4d8954c is described below
commit e3aa4d8954c75ce231d887d41f57bbd942b40bb3
Author: Zikun Ma <[email protected]>
AuthorDate: Wed May 15 18:28:07 2024 +0800
Pipe: RPC compression for data sync connectors (SNAPPY, GZIP, LZ4, ZSTD and
LZMA2) (#12519)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../IoTDBPipeConnectorCompressionIT.java | 182 +++++++++++++++++++++
.../parameter/PipeParameterValidator.java | 4 +-
.../protocol/IoTDBConfigRegionAirGapConnector.java | 27 +--
.../protocol/IoTDBConfigRegionConnector.java | 20 ++-
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 3 +
.../evolvable/request/PipeTransferPlanNodeReq.java | 1 +
.../airgap/IoTDBDataNodeAirGapConnector.java | 12 +-
.../airgap/IoTDBDataRegionAirGapConnector.java | 24 +--
.../airgap/IoTDBSchemaRegionAirGapConnector.java | 15 +-
.../async/IoTDBDataRegionAsyncConnector.java | 18 +-
.../PipeTransferTabletBatchEventHandler.java | 7 +-
.../PipeTransferTsFileInsertionEventHandler.java | 19 ++-
.../thrift/sync/IoTDBDataNodeSyncConnector.java | 5 +-
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 34 ++--
.../thrift/sync/IoTDBSchemaRegionConnector.java | 15 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 3 +
.../config/constant/PipeConnectorConstant.java | 18 ++
.../pipe/connector/compressor/PipeCompressor.java | 57 +++++++
.../compressor/PipeCompressorFactory.java | 80 +++++++++
.../connector/compressor/PipeGZIPCompressor.java | 47 ++++++
.../connector/compressor/PipeLZ4Compressor.java | 47 ++++++
.../connector/compressor/PipeLZMA2Compressor.java | 47 ++++++
.../connector/compressor/PipeSnappyCompressor.java | 47 ++++++
.../connector/compressor/PipeZSTDCompressor.java | 47 ++++++
.../payload/thrift/request/PipeRequestType.java | 3 +
.../thrift/request/PipeTransferCompressedReq.java | 146 +++++++++++++++++
.../connector/protocol/IoTDBAirGapConnector.java | 7 +-
.../pipe/connector/protocol/IoTDBConnector.java | 62 +++++++
.../connector/protocol/IoTDBSslSyncConnector.java | 8 +-
29 files changed, 921 insertions(+), 84 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
new file mode 100644
index 00000000000..e5f76a00eb0
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeConnectorCompressionIT extends AbstractPipeDualAutoIT {
+
+ @Override
+ @Before
+ public void setUp() {
+ // Override to enable air-gap
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setPipeAirGapReceiverEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+
+ // 10 min, assert that the operations will not time out
+ senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
+
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
+
+ senderEnv.initClusterEnvironment();
+ receiverEnv.initClusterEnvironment();
+ }
+
+ @Test
+ public void testCompression1() throws Exception {
+ doTest("iotdb-thrift-connector", "stream", true, "snappy");
+ }
+
+ @Test
+ public void testCompression2() throws Exception {
+ doTest("iotdb-thrift-connector", "batch", true, "snappy, lzma2");
+ }
+
+ @Test
+ public void testCompression3() throws Exception {
+ doTest("iotdb-thrift-sync-connector", "stream", false, "snappy, snappy");
+ }
+
+ @Test
+ public void testCompression4() throws Exception {
+ doTest("iotdb-thrift-sync-connector", "batch", true, "gzip, zstd");
+ }
+
+ @Test
+ public void testCompression5() throws Exception {
+ doTest("iotdb-air-gap-connector", "stream", false, "lzma2, lz4");
+ }
+
+ @Test
+ public void testCompression6() throws Exception {
+ doTest("iotdb-air-gap-connector", "batch", true, "lzma2");
+ }
+
+ private void doTest(
+ String connectorType, String realtimeMode, boolean useBatchMode, String
compressionTypes)
+ throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort =
+ connectorType.contains("air-gap")
+ ? receiverDataNode.getPipeAirGapReceiverPort()
+ : receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values
(2010-01-01T10:00:00+08:00, 1)",
+ "insert into root.db.d1(time, s1) values
(2010-01-02T10:00:00+08:00, 2)",
+ "flush"))) {
+ return;
+ }
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor", "iotdb-extractor");
+ extractorAttributes.put("extractor.realtime.mode", realtimeMode);
+
+ processorAttributes.put("processor", "do-nothing-processor");
+
+ connectorAttributes.put("connector", connectorType);
+ connectorAttributes.put("connector.batch.enable", useBatchMode ? "true"
: "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ connectorAttributes.put("connector.user", "root");
+ connectorAttributes.put("connector.password", "root");
+ connectorAttributes.put("connector.compressor", compressionTypes);
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"));
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values (now(), 3)",
+ "insert into root.db.d1(time, s1) values (now(), 4)",
+ "insert into root.db.d1(time, s1) values (now(), 5)",
+ "insert into root.db.d1(time, s1) values (now(), 6)",
+ "insert into root.db.d1(time, s1) values (now(), 7)",
+ "insert into root.db.d1(time, s1) values (now(), 8)",
+ "flush"))) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("8,"));
+ }
+ }
+}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
index 01e9507f7fe..1fa0046ccda 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
@@ -69,7 +69,9 @@ public class PipeParameterValidator {
}
throw new PipeParameterNotValidException(
- String.format("The value of %s should be one of %s", key,
Arrays.toString(optionalValues)));
+ String.format(
+ "Invalid value %s of %s. The value should be one of %s",
+ actualValue, key, Arrays.toString(optionalValues)));
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index 3235043fb45..3ca8031d268 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -56,8 +56,9 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
@Override
protected byte[] generateHandShakeV1Payload() throws IOException {
- return PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
- CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+ return compressIfNeeded(
+ PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
}
@Override
@@ -70,7 +71,7 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
- return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
+ return
compressIfNeeded(PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params));
}
@Override
@@ -150,8 +151,9 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
throws PipeException, IOException {
if (!send(
socket,
- PipeTransferConfigPlanReq.toTPipeTransferBytes(
- pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
+ compressIfNeeded(
+ PipeTransferConfigPlanReq.toTPipeTransferBytes(
+ pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())))) {
final String errorMessage =
String.format(
"Transfer config region write plan %s error. Socket: %s.",
@@ -197,13 +199,14 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
// 2. Transfer file seal signal, which means the snapshots are transferred
completely
if (!send(
socket,
- PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
- snapshot.getName(),
- snapshot.length(),
- Objects.nonNull(templateFile) ? templateFile.getName() : null,
- Objects.nonNull(templateFile) ? templateFile.length() : 0,
- pipeConfigRegionSnapshotEvent.getFileType(),
- pipeConfigRegionSnapshotEvent.toSealTypeString()))) {
+ compressIfNeeded(
+ PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
+ snapshot.getName(),
+ snapshot.length(),
+ Objects.nonNull(templateFile) ? templateFile.getName() : null,
+ Objects.nonNull(templateFile) ? templateFile.length() : 0,
+ pipeConfigRegionSnapshotEvent.getFileType(),
+ pipeConfigRegionSnapshotEvent.toSealTypeString())))) {
final String errorMessage =
String.format("Seal config region snapshot %s error. Socket %s.",
snapshot, socket);
// Send handshake because we don't know whether the receiver side
configNode
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index c48e29cd72d..a2962c194f3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -127,8 +127,9 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
clientAndStatus
.getLeft()
.pipeTransfer(
- PipeTransferConfigPlanReq.toTPipeTransferReq(
- pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()));
+ compressIfNeeded(
+ PipeTransferConfigPlanReq.toTPipeTransferReq(
+
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())));
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
@@ -192,13 +193,14 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
clientAndStatus
.getLeft()
.pipeTransfer(
- PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
- snapshotFile.getName(),
- snapshotFile.length(),
- Objects.nonNull(templateFile) ? templateFile.getName() :
null,
- Objects.nonNull(templateFile) ? templateFile.length() :
0,
- snapshotEvent.getFileType(),
- snapshotEvent.toSealTypeString()));
+ compressIfNeeded(
+ PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
+ snapshotFile.getName(),
+ snapshotFile.length(),
+ Objects.nonNull(templateFile) ?
templateFile.getName() : null,
+ Objects.nonNull(templateFile) ?
templateFile.length() : 0,
+ snapshotEvent.getFileType(),
+ snapshotEvent.toSealTypeString())));
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 502d4b50f73..0a78553516a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
@@ -124,6 +125,8 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
case TRANSFER_CONFIG_SNAPSHOT_SEAL:
return handleTransferFileSealV2(
PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req));
+ case TRANSFER_COMPRESSED:
+ return
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
default:
break;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferPlanNodeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferPlanNodeReq.java
index 9209473c3a2..580dbe2ed4c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferPlanNodeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferPlanNodeReq.java
@@ -72,6 +72,7 @@ public class PipeTransferPlanNodeReq extends TPipeTransferReq
{
}
/////////////////////////////// Air Gap ///////////////////////////////
+
public static byte[] toTPipeTransferBytes(PlanNode planNode) throws
IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 09ecfce07be..01c7de76da2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -88,8 +88,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector
@Override
protected byte[] generateHandShakeV1Payload() throws IOException {
- return PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
- CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+ return compressIfNeeded(
+ PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
}
@Override
@@ -102,7 +103,7 @@ public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
- return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
+ return
compressIfNeeded(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params));
}
protected void doTransferWrapper(
@@ -126,8 +127,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector
throws PipeException, IOException {
if (!send(
socket,
- PipeTransferPlanNodeReq.toTPipeTransferBytes(
- pipeSchemaRegionWritePlanEvent.getPlanNode()))) {
+ compressIfNeeded(
+ PipeTransferPlanNodeReq.toTPipeTransferBytes(
+ pipeSchemaRegionWritePlanEvent.getPlanNode())))) {
final String errorMessage =
String.format(
"Transfer data node write plan %s error. Socket: %s.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 0548e4da48f..84ca0b0ccdb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -170,10 +170,11 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
final byte[] bytes =
- Objects.isNull(insertNode)
- ? PipeTransferTabletBinaryReq.toTPipeTransferBytes(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer())
- : PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode);
+ compressIfNeeded(
+ Objects.isNull(insertNode)
+ ? PipeTransferTabletBinaryReq.toTPipeTransferBytes(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+ :
PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode));
if (!send(socket, bytes)) {
final String errorMessage =
@@ -209,9 +210,10 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
throws PipeException, IOException {
if (!send(
socket,
- PipeTransferTabletRawReq.toTPipeTransferBytes(
- pipeRawTabletInsertionEvent.convertToTablet(),
- pipeRawTabletInsertionEvent.isAligned()))) {
+ compressIfNeeded(
+ PipeTransferTabletRawReq.toTPipeTransferBytes(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned())))) {
final String errorMessage =
String.format(
"Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.",
@@ -254,8 +256,9 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
// 2. Transfer file seal signal with mod, which means the file is
transferred completely
if (!send(
socket,
- PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
- modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length()))) {
+ compressIfNeeded(
+ PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
+ modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length())))) {
receiverStatusHandler.handle(
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
@@ -269,7 +272,8 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
// 2. Transfer file seal signal without mod, which means the file is
transferred completely
if (!send(
socket,
- PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(),
tsFile.length()))) {
+ compressIfNeeded(
+ PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(),
tsFile.length())))) {
receiverStatusHandler.handle(
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
index 74375cff112..d4e6706a5a4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
@@ -114,13 +114,14 @@ public class IoTDBSchemaRegionAirGapConnector extends
IoTDBDataNodeAirGapConnect
// 2. Transfer file seal signal, which means the snapshots is transferred
completely
if (!send(
socket,
- PipeTransferSchemaSnapshotSealReq.toTPipeTransferBytes(
- mtreeSnapshotFile.getName(),
- mtreeSnapshotFile.length(),
- Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.getName()
: null,
- Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.length()
: 0,
- pipeSchemaRegionSnapshotEvent.getDatabaseName(),
- pipeSchemaRegionSnapshotEvent.toSealTypeString()))) {
+ compressIfNeeded(
+ PipeTransferSchemaSnapshotSealReq.toTPipeTransferBytes(
+ mtreeSnapshotFile.getName(),
+ mtreeSnapshotFile.length(),
+ Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.getName() : null,
+ Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.length() : 0,
+ pipeSchemaRegionSnapshotEvent.getDatabaseName(),
+ pipeSchemaRegionSnapshotEvent.toSealTypeString())))) {
final String errorMessage =
String.format(
"Seal schema region snapshot file %s and %s error. Socket %s.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 74cbfe2e39b..c9f076b0f04 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -185,10 +185,11 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
final TPipeTransferReq pipeTransferReq =
- Objects.isNull(insertNode)
- ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer())
- :
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode);
+ compressIfNeeded(
+ Objects.isNull(insertNode)
+ ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+ :
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
final PipeTransferTabletInsertNodeEventHandler
pipeTransferInsertNodeReqHandler =
new PipeTransferTabletInsertNodeEventHandler(
pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
@@ -207,10 +208,11 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
return;
}
- final PipeTransferTabletRawReq pipeTransferTabletRawReq =
- PipeTransferTabletRawReq.toTPipeTransferReq(
- pipeRawTabletInsertionEvent.convertToTablet(),
- pipeRawTabletInsertionEvent.isAligned());
+ final TPipeTransferReq pipeTransferTabletRawReq =
+ compressIfNeeded(
+ PipeTransferTabletRawReq.toTPipeTransferReq(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned()));
final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
new PipeTransferTabletRawEventHandler(
pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 500ed4cab98..88e41343950 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
@@ -59,7 +60,11 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
// Deep copy to keep Ids' and events' reference
requestCommitIds = batch.deepCopyRequestCommitIds();
events = batch.deepCopyEvents();
- req = batch.toTPipeTransferReq();
+ req =
+ connector.isRpcCompressionEnabled()
+ ? batch.toTPipeTransferReq()
+ : PipeTransferCompressedReq.toTPipeTransferReq(
+ batch.toTPipeTransferReq(), connector.getCompressors());
this.connector = connector;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index aebff325920..07669de76bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -31,6 +32,7 @@ import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
@@ -134,12 +136,19 @@ public class PipeTransferTsFileInsertionEventHandler
readLength == readFileBufferSize
? readBuffer
: Arrays.copyOfRange(readBuffer, 0, readLength);
+ final TPipeTransferReq uncompressedReq =
+ PipeTransferCompressedReq.toTPipeTransferReq(
+ transferMod
+ ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(
+ currentFile.getName(), position, payload)
+ : PipeTransferTsFilePieceReq.toTPipeTransferReq(
+ currentFile.getName(), position, payload),
+ connector.getCompressors());
client.pipeTransfer(
- transferMod
- ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(
- currentFile.getName(), position, payload)
- : PipeTransferTsFilePieceReq.toTPipeTransferReq(
- currentFile.getName(), position, payload),
+ connector.isRpcCompressionEnabled()
+ ? PipeTransferCompressedReq.toTPipeTransferReq(
+ uncompressedReq, connector.getCompressors())
+ : uncompressedReq,
this);
position += readLength;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 378e6f74281..33c97ce27f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -117,8 +117,9 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
clientAndStatus
.getLeft()
.pipeTransfer(
- PipeTransferPlanNodeReq.toTPipeTransferReq(
- pipeSchemaRegionWritePlanEvent.getPlanNode()));
+ compressIfNeeded(
+ PipeTransferPlanNodeReq.toTPipeTransferReq(
+ pipeSchemaRegionWritePlanEvent.getPlanNode())));
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 568baa13144..cb93bffd411 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -176,7 +176,10 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
final TPipeTransferResp resp;
try {
- resp =
clientAndStatus.getLeft().pipeTransfer(batchToTransfer.toTPipeTransferReq());
+ resp =
+ clientAndStatus
+ .getLeft()
+
.pipeTransfer(compressIfNeeded(batchToTransfer.toTPipeTransferReq()));
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
@@ -239,15 +242,18 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
resp =
clientAndStatus
.getLeft()
-
.pipeTransfer(PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
+ .pipeTransfer(
+ compressIfNeeded(
+
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode)));
} else {
clientAndStatus = clientManager.getClient();
resp =
clientAndStatus
.getLeft()
.pipeTransfer(
- PipeTransferTabletBinaryReq.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer()));
+ compressIfNeeded(
+ PipeTransferTabletBinaryReq.toTPipeTransferReq(
+
pipeInsertNodeTabletInsertionEvent.getByteBuffer())));
}
} catch (final Exception e) {
if (clientAndStatus != null) {
@@ -303,9 +309,10 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
clientAndStatus
.getLeft()
.pipeTransfer(
- PipeTransferTabletRawReq.toTPipeTransferReq(
- pipeRawTabletInsertionEvent.convertToTablet(),
- pipeRawTabletInsertionEvent.isAligned()));
+ compressIfNeeded(
+ PipeTransferTabletRawReq.toTPipeTransferReq(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned())));
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
@@ -364,8 +371,12 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
clientAndStatus
.getLeft()
.pipeTransfer(
- PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
- modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length()));
+ compressIfNeeded(
+ PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+ modFile.getName(),
+ modFile.length(),
+ tsFile.getName(),
+ tsFile.length())));
} catch (final Exception e) {
clientAndStatus.setRight(false);
clientManager.adjustTimeoutIfNecessary(e);
@@ -381,8 +392,9 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
clientAndStatus
.getLeft()
.pipeTransfer(
- PipeTransferTsFileSealReq.toTPipeTransferReq(
- tsFile.getName(), tsFile.length()));
+ compressIfNeeded(
+ PipeTransferTsFileSealReq.toTPipeTransferReq(
+ tsFile.getName(), tsFile.length())));
} catch (final Exception e) {
clientAndStatus.setRight(false);
clientManager.adjustTimeoutIfNecessary(e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
index 00ae9861dcf..667050320d0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
@@ -103,13 +103,14 @@ public class IoTDBSchemaRegionConnector extends
IoTDBDataNodeSyncConnector {
clientAndStatus
.getLeft()
.pipeTransfer(
- PipeTransferSchemaSnapshotSealReq.toTPipeTransferReq(
- mTreeSnapshotFile.getName(),
- mTreeSnapshotFile.length(),
- Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.getName() : null,
- Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.length() : 0,
- snapshotEvent.getDatabaseName(),
- snapshotEvent.toSealTypeString()));
+ compressIfNeeded(
+ PipeTransferSchemaSnapshotSealReq.toTPipeTransferReq(
+ mTreeSnapshotFile.getName(),
+ mTreeSnapshotFile.length(),
+ Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.getName() : null,
+ Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.length() : 0,
+ snapshotEvent.getDatabaseName(),
+ snapshotEvent.toSealTypeString())));
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index c5f1d5fa99d..dfd8be444b6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
@@ -188,6 +189,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
// Config requests will first be received by the DataNode receiver,
// then transferred to ConfigNode receiver to execute.
return handleTransferConfigPlan(req);
+ case TRANSFER_COMPRESSED:
+ return
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
default:
break;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index bb9d075910d..331c4d79a2f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -155,6 +155,24 @@ public class PipeConnectorConstant {
CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY,
CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY)));
+ public static final String CONNECTOR_COMPRESSOR_KEY = "connector.compressor";
+ public static final String SINK_COMPRESSOR_KEY = "sink.compressor";
+ public static final String CONNECTOR_COMPRESSOR_DEFAULT_VALUE = "";
+ public static final String CONNECTOR_COMPRESSOR_SNAPPY = "snappy";
+ public static final String CONNECTOR_COMPRESSOR_GZIP = "gzip";
+ public static final String CONNECTOR_COMPRESSOR_LZ4 = "lz4";
+ public static final String CONNECTOR_COMPRESSOR_ZSTD = "zstd";
+ public static final String CONNECTOR_COMPRESSOR_LZMA2 = "lzma2";
+ public static final Set<String> CONNECTOR_COMPRESSOR_SET =
+ Collections.unmodifiableSet(
+ new HashSet<>(
+ Arrays.asList(
+ CONNECTOR_COMPRESSOR_SNAPPY,
+ CONNECTOR_COMPRESSOR_GZIP,
+ CONNECTOR_COMPRESSOR_LZ4,
+ CONNECTOR_COMPRESSOR_ZSTD,
+ CONNECTOR_COMPRESSOR_LZMA2)));
+
public static final String SINK_TOPIC_KEY = "sink.topic";
public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
new file mode 100644
index 00000000000..4d5113e4c20
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import java.io.IOException;
+
+public abstract class PipeCompressor {
+
+ public enum PipeCompressionType {
+ SNAPPY((byte) 0),
+ GZIP((byte) 1),
+ LZ4((byte) 2),
+ ZSTD((byte) 3),
+ LZMA2((byte) 4);
+
+ final byte index;
+
+ PipeCompressionType(byte index) {
+ this.index = index;
+ }
+
+ public byte getIndex() {
+ return index;
+ }
+ }
+
+ private final PipeCompressionType compressionType;
+
+ protected PipeCompressor(PipeCompressionType compressionType) {
+ this.compressionType = compressionType;
+ }
+
+ public abstract byte[] compress(byte[] data) throws IOException;
+
+ public abstract byte[] decompress(byte[] byteArray) throws IOException;
+
+ public byte serialize() {
+ return compressionType.getIndex();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
new file mode 100644
index 00000000000..14ae972fc4f
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_GZIP;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZ4;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZMA2;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_SNAPPY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD;
+
+public class PipeCompressorFactory {
+
+ private static Map<String, PipeCompressor> COMPRESSOR_NAME_TO_INSTANCE = new
HashMap<>();
+
+ static {
+ COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_SNAPPY, new
PipeSnappyCompressor());
+ COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_GZIP, new
PipeGZIPCompressor());
+ COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_LZ4, new
PipeLZ4Compressor());
+ COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_ZSTD, new
PipeZSTDCompressor());
+ COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_LZMA2, new
PipeLZMA2Compressor());
+ COMPRESSOR_NAME_TO_INSTANCE =
Collections.unmodifiableMap(COMPRESSOR_NAME_TO_INSTANCE);
+ }
+
+ public static PipeCompressor getCompressor(String name) {
+ final PipeCompressor compressor = COMPRESSOR_NAME_TO_INSTANCE.get(name);
+ if (compressor == null) {
+ throw new UnsupportedOperationException("PipeCompressor not found for
name: " + name);
+ }
+ return compressor;
+ }
+
+ private static Map<Byte, PipeCompressor> COMPRESSOR_INDEX_TO_INSTANCE = new
HashMap<>();
+
+ static {
+ COMPRESSOR_INDEX_TO_INSTANCE.put(
+ PipeCompressor.PipeCompressionType.SNAPPY.getIndex(), new
PipeSnappyCompressor());
+ COMPRESSOR_INDEX_TO_INSTANCE.put(
+ PipeCompressor.PipeCompressionType.GZIP.getIndex(), new
PipeGZIPCompressor());
+ COMPRESSOR_INDEX_TO_INSTANCE.put(
+ PipeCompressor.PipeCompressionType.LZ4.getIndex(), new
PipeLZ4Compressor());
+ COMPRESSOR_INDEX_TO_INSTANCE.put(
+ PipeCompressor.PipeCompressionType.ZSTD.getIndex(), new
PipeZSTDCompressor());
+ COMPRESSOR_INDEX_TO_INSTANCE.put(
+ PipeCompressor.PipeCompressionType.LZMA2.getIndex(), new
PipeLZMA2Compressor());
+ COMPRESSOR_INDEX_TO_INSTANCE =
Collections.unmodifiableMap(COMPRESSOR_INDEX_TO_INSTANCE);
+ }
+
+ public static PipeCompressor getCompressor(byte index) {
+ final PipeCompressor compressor = COMPRESSOR_INDEX_TO_INSTANCE.get(index);
+ if (compressor == null) {
+ throw new UnsupportedOperationException("PipeCompressor not found for
index: " + index);
+ }
+ return compressor;
+ }
+
+ private PipeCompressorFactory() {
+ // Empty constructor
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
new file mode 100644
index 00000000000..2cc1887555c
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+
+import java.io.IOException;
+
+public class PipeGZIPCompressor extends PipeCompressor {
+
+ private static final ICompressor COMPRESSOR =
ICompressor.getCompressor(CompressionType.GZIP);
+ private static final IUnCompressor DECOMPRESSOR =
+ IUnCompressor.getUnCompressor(CompressionType.GZIP);
+
+ public PipeGZIPCompressor() {
+ super(PipeCompressionType.GZIP);
+ }
+
+ @Override
+ public byte[] compress(byte[] data) throws IOException {
+ return COMPRESSOR.compress(data);
+ }
+
+ @Override
+ public byte[] decompress(byte[] byteArray) throws IOException {
+ return DECOMPRESSOR.uncompress(byteArray);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
new file mode 100644
index 00000000000..5d6db90e661
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+
+import java.io.IOException;
+
+public class PipeLZ4Compressor extends PipeCompressor {
+
+ private static final ICompressor COMPRESSOR =
ICompressor.getCompressor(CompressionType.LZ4);
+ private static final IUnCompressor DECOMPRESSOR =
+ IUnCompressor.getUnCompressor(CompressionType.LZ4);
+
+ public PipeLZ4Compressor() {
+ super(PipeCompressionType.LZ4);
+ }
+
+ @Override
+ public byte[] compress(byte[] data) throws IOException {
+ return COMPRESSOR.compress(data);
+ }
+
+ @Override
+ public byte[] decompress(byte[] byteArray) throws IOException {
+ return DECOMPRESSOR.uncompress(byteArray);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
new file mode 100644
index 00000000000..9e7c0a8b8d3
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+
+import java.io.IOException;
+
+public class PipeLZMA2Compressor extends PipeCompressor {
+
+ private static final ICompressor COMPRESSOR =
ICompressor.getCompressor(CompressionType.LZMA2);
+ private static final IUnCompressor DECOMPRESSOR =
+ IUnCompressor.getUnCompressor(CompressionType.LZMA2);
+
+ public PipeLZMA2Compressor() {
+ super(PipeCompressionType.LZMA2);
+ }
+
+ @Override
+ public byte[] compress(byte[] data) throws IOException {
+ return COMPRESSOR.compress(data);
+ }
+
+ @Override
+ public byte[] decompress(byte[] byteArray) throws IOException {
+ return DECOMPRESSOR.uncompress(byteArray);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
new file mode 100644
index 00000000000..72333c1e929
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+
+import java.io.IOException;
+
+public class PipeSnappyCompressor extends PipeCompressor {
+
+ private static final ICompressor COMPRESSOR =
ICompressor.getCompressor(CompressionType.SNAPPY);
+ private static final IUnCompressor DECOMPRESSOR =
+ IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
+
+ public PipeSnappyCompressor() {
+ super(PipeCompressionType.SNAPPY);
+ }
+
+ @Override
+ public byte[] compress(byte[] data) throws IOException {
+ return COMPRESSOR.compress(data);
+ }
+
+ @Override
+ public byte[] decompress(byte[] byteArray) throws IOException {
+ return DECOMPRESSOR.uncompress(byteArray);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
new file mode 100644
index 00000000000..72782353d68
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+
+import java.io.IOException;
+
+public class PipeZSTDCompressor extends PipeCompressor {
+
+ private static final ICompressor COMPRESSOR =
ICompressor.getCompressor(CompressionType.ZSTD);
+ private static final IUnCompressor DECOMPRESSOR =
+ IUnCompressor.getUnCompressor(CompressionType.ZSTD);
+
+ public PipeZSTDCompressor() {
+ super(PipeCompressionType.ZSTD);
+ }
+
+ @Override
+ public byte[] compress(byte[] data) throws IOException {
+ return COMPRESSOR.compress(data);
+ }
+
+ @Override
+ public byte[] decompress(byte[] byteArray) throws IOException {
+ return DECOMPRESSOR.uncompress(byteArray);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
index cbe7b395d1e..003c8b9afb3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
@@ -50,6 +50,9 @@ public enum PipeRequestType {
TRANSFER_CONFIG_PLAN((short) 200),
TRANSFER_CONFIG_SNAPSHOT_PIECE((short) 201),
TRANSFER_CONFIG_SNAPSHOT_SEAL((short) 202),
+
+ // RPC Compression
+ TRANSFER_COMPRESSED((short) 300),
;
private final short type;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
new file mode 100644
index 00000000000..f5e114949bd
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+
+import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
+import
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class PipeTransferCompressedReq extends TPipeTransferReq {
+
+ /** Generate a compressed req with provided compressors. */
+ public static TPipeTransferReq toTPipeTransferReq(
+ final TPipeTransferReq originalReq, final List<PipeCompressor>
compressors)
+ throws IOException {
+ // The generated PipeTransferCompressedReq consists of:
+ // version
+ // type: TRANSFER_COMPRESSED
+ // body:
+ // (byte) count of compressors
+ // (bytes) 1 byte for each compressor
+ // compressed req:
+ // (byte) version
+ // (2 bytes) type
+ // (bytes) body
+ final PipeTransferCompressedReq compressedReq = new
PipeTransferCompressedReq();
+ compressedReq.version =
IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+ compressedReq.type = PipeRequestType.TRANSFER_COMPRESSED.getType();
+
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
+ for (final PipeCompressor compressor : compressors) {
+ ReadWriteIOUtils.write(compressor.serialize(), outputStream);
+ }
+
+ byte[] body =
+ BytesUtils.concatByteArrayList(
+ Arrays.asList(
+ new byte[] {originalReq.version},
+ BytesUtils.shortToBytes(originalReq.type),
+ originalReq.getBody()));
+ for (final PipeCompressor compressor : compressors) {
+ body = compressor.compress(body);
+ }
+ outputStream.write(body);
+
+ compressedReq.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ return compressedReq;
+ }
+
+ /** Get the original req from a compressed req. */
+ public static TPipeTransferReq fromTPipeTransferReq(final TPipeTransferReq
transferReq)
+ throws IOException {
+ final ByteBuffer compressedBuffer = transferReq.body;
+
+ final List<PipeCompressor> compressors = new ArrayList<>();
+ final int compressorsSize = ReadWriteIOUtils.readByte(compressedBuffer);
+ for (int i = 0; i < compressorsSize; ++i) {
+ compressors.add(
+
PipeCompressorFactory.getCompressor(ReadWriteIOUtils.readByte(compressedBuffer)));
+ }
+
+ byte[] body = new byte[compressedBuffer.remaining()];
+ compressedBuffer.get(body);
+
+ for (int i = compressors.size() - 1; i >= 0; --i) {
+ body = compressors.get(i).decompress(body);
+ }
+
+ final ByteBuffer decompressedBuffer = ByteBuffer.wrap(body);
+
+ final TPipeTransferReq decompressedReq = new TPipeTransferReq();
+ decompressedReq.version = ReadWriteIOUtils.readByte(decompressedBuffer);
+ decompressedReq.type = ReadWriteIOUtils.readShort(decompressedBuffer);
+ decompressedReq.body = decompressedBuffer.slice();
+
+ return decompressedReq;
+ }
+
+ /**
+ * For air-gap connectors. Generate the bytes of a compressed req from the
bytes of original req.
+ */
+ public static byte[] toTPipeTransferReqBytes(
+ final byte[] rawReqInBytes, final List<PipeCompressor> compressors)
throws IOException {
+ // The generated bytes consists of:
+ // (byte) version
+ // (2 bytes) type: TRANSFER_COMPRESSED
+ // (byte) count of compressors
+ // (bytes) 1 byte for each compressor
+ // compressed req:
+ // (byte) version
+ // (2 bytes) type
+ // (bytes) body
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_COMPRESSED.getType(),
outputStream);
+ ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
+ for (final PipeCompressor compressor : compressors) {
+ ReadWriteIOUtils.write(compressor.serialize(), outputStream);
+ }
+
+ byte[] compressedReq = rawReqInBytes;
+ for (final PipeCompressor compressor : compressors) {
+ compressedReq = compressor.compress(compressedReq);
+ }
+ outputStream.write(compressedReq);
+
+ return byteArrayOutputStream.toByteArray();
+ }
+ }
+
+ private PipeTransferCompressedReq() {
+ // Empty constructor
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 3034b581d5d..ec2d22ddfa6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -226,9 +226,10 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
: Arrays.copyOfRange(readBuffer, 0, readLength);
if (!send(
socket,
- isMultiFile
- ? getTransferMultiFilePieceBytes(file.getName(), position,
payload)
- : getTransferSingleFilePieceBytes(file.getName(), position,
payload))) {
+ compressIfNeeded(
+ isMultiFile
+ ? getTransferMultiFilePieceBytes(file.getName(), position,
payload)
+ : getTransferSingleFilePieceBytes(file.getName(),
position, payload)))) {
final String errorMessage =
String.format("Transfer file %s error. Socket %s.", file,
socket);
if (mayNeedHandshakeWhenFail()) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 65b32a48396..751f0dec05f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -21,16 +21,21 @@ package org.apache.iotdb.commons.pipe.connector.protocol;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
+import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
+import
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
@@ -38,6 +43,9 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_SET;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE;
@@ -57,6 +65,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
@@ -82,6 +91,9 @@ public abstract class IoTDBConnector implements PipeConnector
{
protected String loadBalanceStrategy;
+ protected boolean isRpcCompressionEnabled;
+ protected final List<PipeCompressor> compressors = new ArrayList<>();
+
protected boolean isTabletBatchModeEnabled = true;
protected PipeReceiverStatusHandler receiverStatusHandler;
@@ -127,6 +139,36 @@ public abstract class IoTDBConnector implements
PipeConnector {
CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
loadBalanceStrategy);
+ final String compressionTypes =
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_COMPRESSOR_KEY, SINK_COMPRESSOR_KEY),
+ CONNECTOR_COMPRESSOR_DEFAULT_VALUE)
+ .toLowerCase();
+ if (!compressionTypes.isEmpty()) {
+ for (final String compressionType : compressionTypes.split(",")) {
+ final String trimmedCompressionType = compressionType.trim();
+ if (trimmedCompressionType.isEmpty()) {
+ continue;
+ }
+
+ validator.validate(
+ arg -> CONNECTOR_COMPRESSOR_SET.contains(trimmedCompressionType),
+ String.format(
+ "Compressor should be one of %s, but got %s.",
+ CONNECTOR_COMPRESSOR_SET, trimmedCompressionType),
+ trimmedCompressionType);
+
compressors.add(PipeCompressorFactory.getCompressor(trimmedCompressionType));
+ }
+ }
+ validator.validate(
+ arg -> compressors.size() <= Byte.MAX_VALUE,
+ String.format(
+ "The number of compressors should be less than or equal to %d, but
got %d.",
+ Byte.MAX_VALUE, compressors.size()),
+ compressors.size());
+ isRpcCompressionEnabled = !compressors.isEmpty();
+
validator.validate(
arg -> arg.equals("retry") || arg.equals("ignore"),
String.format(
@@ -260,6 +302,26 @@ public abstract class IoTDBConnector implements
PipeConnector {
}
}
+ protected TPipeTransferReq compressIfNeeded(TPipeTransferReq req) throws
IOException {
+ return isRpcCompressionEnabled
+ ? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors)
+ : req;
+ }
+
+ protected byte[] compressIfNeeded(byte[] reqInBytes) throws IOException {
+ return isRpcCompressionEnabled
+ ? PipeTransferCompressedReq.toTPipeTransferReqBytes(reqInBytes,
compressors)
+ : reqInBytes;
+ }
+
+ public boolean isRpcCompressionEnabled() {
+ return isRpcCompressionEnabled;
+ }
+
+ public List<PipeCompressor> getCompressors() {
+ return compressors;
+ }
+
public PipeReceiverStatusHandler statusHandler() {
return receiverStatusHandler;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index e366584fe8f..4e1ab4d4789 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -166,9 +166,11 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
clientAndStatus
.getLeft()
.pipeTransfer(
- isMultiFile
- ? getTransferMultiFilePieceReq(file.getName(),
position, payLoad)
- : getTransferSingleFilePieceReq(file.getName(),
position, payLoad)));
+ compressIfNeeded(
+ isMultiFile
+ ?
getTransferMultiFilePieceReq(file.getName(), position, payLoad)
+ : getTransferSingleFilePieceReq(
+ file.getName(), position, payLoad))));
} catch (Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(