This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e3aa4d8954c Pipe: RPC compression for data sync connectors (SNAPPY, 
GZIP, LZ4, ZSTD and LZMA2) (#12519)
e3aa4d8954c is described below

commit e3aa4d8954c75ce231d887d41f57bbd942b40bb3
Author: Zikun Ma <[email protected]>
AuthorDate: Wed May 15 18:28:07 2024 +0800

    Pipe: RPC compression for data sync connectors (SNAPPY, GZIP, LZ4, ZSTD and 
LZMA2) (#12519)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../IoTDBPipeConnectorCompressionIT.java           | 182 +++++++++++++++++++++
 .../parameter/PipeParameterValidator.java          |   4 +-
 .../protocol/IoTDBConfigRegionAirGapConnector.java |  27 +--
 .../protocol/IoTDBConfigRegionConnector.java       |  20 ++-
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |   3 +
 .../evolvable/request/PipeTransferPlanNodeReq.java |   1 +
 .../airgap/IoTDBDataNodeAirGapConnector.java       |  12 +-
 .../airgap/IoTDBDataRegionAirGapConnector.java     |  24 +--
 .../airgap/IoTDBSchemaRegionAirGapConnector.java   |  15 +-
 .../async/IoTDBDataRegionAsyncConnector.java       |  18 +-
 .../PipeTransferTabletBatchEventHandler.java       |   7 +-
 .../PipeTransferTsFileInsertionEventHandler.java   |  19 ++-
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    |   5 +-
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  |  34 ++--
 .../thrift/sync/IoTDBSchemaRegionConnector.java    |  15 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |   3 +
 .../config/constant/PipeConnectorConstant.java     |  18 ++
 .../pipe/connector/compressor/PipeCompressor.java  |  57 +++++++
 .../compressor/PipeCompressorFactory.java          |  80 +++++++++
 .../connector/compressor/PipeGZIPCompressor.java   |  47 ++++++
 .../connector/compressor/PipeLZ4Compressor.java    |  47 ++++++
 .../connector/compressor/PipeLZMA2Compressor.java  |  47 ++++++
 .../connector/compressor/PipeSnappyCompressor.java |  47 ++++++
 .../connector/compressor/PipeZSTDCompressor.java   |  47 ++++++
 .../payload/thrift/request/PipeRequestType.java    |   3 +
 .../thrift/request/PipeTransferCompressedReq.java  | 146 +++++++++++++++++
 .../connector/protocol/IoTDBAirGapConnector.java   |   7 +-
 .../pipe/connector/protocol/IoTDBConnector.java    |  62 +++++++
 .../connector/protocol/IoTDBSslSyncConnector.java  |   8 +-
 29 files changed, 921 insertions(+), 84 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
new file mode 100644
index 00000000000..e5f76a00eb0
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeConnectorCompressionIT extends AbstractPipeDualAutoIT {
+
+  @Override
+  @Before
+  public void setUp() {
+    // Override to enable air-gap
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setPipeAirGapReceiverEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+
+    // 10 min, assert that the operations will not time out
+    senderEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
+    
receiverEnv.getConfig().getConfigNodeConfig().setConnectionTimeoutMs(600000);
+
+    senderEnv.initClusterEnvironment();
+    receiverEnv.initClusterEnvironment();
+  }
+
+  @Test
+  public void testCompression1() throws Exception {
+    doTest("iotdb-thrift-connector", "stream", true, "snappy");
+  }
+
+  @Test
+  public void testCompression2() throws Exception {
+    doTest("iotdb-thrift-connector", "batch", true, "snappy, lzma2");
+  }
+
+  @Test
+  public void testCompression3() throws Exception {
+    doTest("iotdb-thrift-sync-connector", "stream", false, "snappy, snappy");
+  }
+
+  @Test
+  public void testCompression4() throws Exception {
+    doTest("iotdb-thrift-sync-connector", "batch", true, "gzip, zstd");
+  }
+
+  @Test
+  public void testCompression5() throws Exception {
+    doTest("iotdb-air-gap-connector", "stream", false, "lzma2, lz4");
+  }
+
+  @Test
+  public void testCompression6() throws Exception {
+    doTest("iotdb-air-gap-connector", "batch", true, "lzma2");
+  }
+
+  private void doTest(
+      String connectorType, String realtimeMode, boolean useBatchMode, String 
compressionTypes)
+      throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort =
+        connectorType.contains("air-gap")
+            ? receiverDataNode.getPipeAirGapReceiverPort()
+            : receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s1) values 
(2010-01-01T10:00:00+08:00, 1)",
+              "insert into root.db.d1(time, s1) values 
(2010-01-02T10:00:00+08:00, 2)",
+              "flush"))) {
+        return;
+      }
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor", "iotdb-extractor");
+      extractorAttributes.put("extractor.realtime.mode", realtimeMode);
+
+      processorAttributes.put("processor", "do-nothing-processor");
+
+      connectorAttributes.put("connector", connectorType);
+      connectorAttributes.put("connector.batch.enable", useBatchMode ? "true" 
: "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      connectorAttributes.put("connector.user", "root");
+      connectorAttributes.put("connector.password", "root");
+      connectorAttributes.put("connector.compressor", compressionTypes);
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("2,"));
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s1) values (now(), 3)",
+              "insert into root.db.d1(time, s1) values (now(), 4)",
+              "insert into root.db.d1(time, s1) values (now(), 5)",
+              "insert into root.db.d1(time, s1) values (now(), 6)",
+              "insert into root.db.d1(time, s1) values (now(), 7)",
+              "insert into root.db.d1(time, s1) values (now(), 8)",
+              "flush"))) {
+        return;
+      }
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("8,"));
+    }
+  }
+}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
index 01e9507f7fe..1fa0046ccda 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
@@ -69,7 +69,9 @@ public class PipeParameterValidator {
     }
 
     throw new PipeParameterNotValidException(
-        String.format("The value of %s should be one of %s", key, 
Arrays.toString(optionalValues)));
+        String.format(
+            "Invalid value %s of %s. The value should be one of %s",
+            actualValue, key, Arrays.toString(optionalValues)));
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index 3235043fb45..3ca8031d268 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -56,8 +56,9 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
 
   @Override
   protected byte[] generateHandShakeV1Payload() throws IOException {
-    return PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
-        CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    return compressIfNeeded(
+        PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
   }
 
   @Override
@@ -70,7 +71,7 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
         CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
 
-    return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
+    return 
compressIfNeeded(PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params));
   }
 
   @Override
@@ -150,8 +151,9 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
       throws PipeException, IOException {
     if (!send(
         socket,
-        PipeTransferConfigPlanReq.toTPipeTransferBytes(
-            pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
+        compressIfNeeded(
+            PipeTransferConfigPlanReq.toTPipeTransferBytes(
+                pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())))) {
       final String errorMessage =
           String.format(
               "Transfer config region write plan %s error. Socket: %s.",
@@ -197,13 +199,14 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
     // 2. Transfer file seal signal, which means the snapshots are transferred 
completely
     if (!send(
         socket,
-        PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
-            snapshot.getName(),
-            snapshot.length(),
-            Objects.nonNull(templateFile) ? templateFile.getName() : null,
-            Objects.nonNull(templateFile) ? templateFile.length() : 0,
-            pipeConfigRegionSnapshotEvent.getFileType(),
-            pipeConfigRegionSnapshotEvent.toSealTypeString()))) {
+        compressIfNeeded(
+            PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
+                snapshot.getName(),
+                snapshot.length(),
+                Objects.nonNull(templateFile) ? templateFile.getName() : null,
+                Objects.nonNull(templateFile) ? templateFile.length() : 0,
+                pipeConfigRegionSnapshotEvent.getFileType(),
+                pipeConfigRegionSnapshotEvent.toSealTypeString())))) {
       final String errorMessage =
           String.format("Seal config region snapshot %s error. Socket %s.", 
snapshot, socket);
       // Send handshake because we don't know whether the receiver side 
configNode
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index c48e29cd72d..a2962c194f3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -127,8 +127,9 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
           clientAndStatus
               .getLeft()
               .pipeTransfer(
-                  PipeTransferConfigPlanReq.toTPipeTransferReq(
-                      pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()));
+                  compressIfNeeded(
+                      PipeTransferConfigPlanReq.toTPipeTransferReq(
+                          
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())));
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
@@ -192,13 +193,14 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
           clientAndStatus
               .getLeft()
               .pipeTransfer(
-                  PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
-                      snapshotFile.getName(),
-                      snapshotFile.length(),
-                      Objects.nonNull(templateFile) ? templateFile.getName() : 
null,
-                      Objects.nonNull(templateFile) ? templateFile.length() : 
0,
-                      snapshotEvent.getFileType(),
-                      snapshotEvent.toSealTypeString()));
+                  compressIfNeeded(
+                      PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
+                          snapshotFile.getName(),
+                          snapshotFile.length(),
+                          Objects.nonNull(templateFile) ? 
templateFile.getName() : null,
+                          Objects.nonNull(templateFile) ? 
templateFile.length() : 0,
+                          snapshotEvent.getFileType(),
+                          snapshotEvent.toSealTypeString())));
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 502d4b50f73..0a78553516a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
@@ -124,6 +125,8 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_CONFIG_SNAPSHOT_SEAL:
             return handleTransferFileSealV2(
                 PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req));
+          case TRANSFER_COMPRESSED:
+            return 
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
           default:
             break;
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferPlanNodeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferPlanNodeReq.java
index 9209473c3a2..580dbe2ed4c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferPlanNodeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferPlanNodeReq.java
@@ -72,6 +72,7 @@ public class PipeTransferPlanNodeReq extends TPipeTransferReq 
{
   }
 
   /////////////////////////////// Air Gap ///////////////////////////////
+
   public static byte[] toTPipeTransferBytes(PlanNode planNode) throws 
IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 09ecfce07be..01c7de76da2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -88,8 +88,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
 
   @Override
   protected byte[] generateHandShakeV1Payload() throws IOException {
-    return PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
-        CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    return compressIfNeeded(
+        PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
   }
 
   @Override
@@ -102,7 +103,7 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
         CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
 
-    return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
+    return 
compressIfNeeded(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params));
   }
 
   protected void doTransferWrapper(
@@ -126,8 +127,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
       throws PipeException, IOException {
     if (!send(
         socket,
-        PipeTransferPlanNodeReq.toTPipeTransferBytes(
-            pipeSchemaRegionWritePlanEvent.getPlanNode()))) {
+        compressIfNeeded(
+            PipeTransferPlanNodeReq.toTPipeTransferBytes(
+                pipeSchemaRegionWritePlanEvent.getPlanNode())))) {
       final String errorMessage =
           String.format(
               "Transfer data node write plan %s error. Socket: %s.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 0548e4da48f..84ca0b0ccdb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -170,10 +170,11 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
     final InsertNode insertNode =
         pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
     final byte[] bytes =
-        Objects.isNull(insertNode)
-            ? PipeTransferTabletBinaryReq.toTPipeTransferBytes(
-                pipeInsertNodeTabletInsertionEvent.getByteBuffer())
-            : PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode);
+        compressIfNeeded(
+            Objects.isNull(insertNode)
+                ? PipeTransferTabletBinaryReq.toTPipeTransferBytes(
+                    pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+                : 
PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode));
 
     if (!send(socket, bytes)) {
       final String errorMessage =
@@ -209,9 +210,10 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
       throws PipeException, IOException {
     if (!send(
         socket,
-        PipeTransferTabletRawReq.toTPipeTransferBytes(
-            pipeRawTabletInsertionEvent.convertToTablet(),
-            pipeRawTabletInsertionEvent.isAligned()))) {
+        compressIfNeeded(
+            PipeTransferTabletRawReq.toTPipeTransferBytes(
+                pipeRawTabletInsertionEvent.convertToTablet(),
+                pipeRawTabletInsertionEvent.isAligned())))) {
       final String errorMessage =
           String.format(
               "Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.",
@@ -254,8 +256,9 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
       // 2. Transfer file seal signal with mod, which means the file is 
transferred completely
       if (!send(
           socket,
-          PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
-              modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length()))) {
+          compressIfNeeded(
+              PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
+                  modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length())))) {
         receiverStatusHandler.handle(
             new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
                 .setMessage(errorMessage),
@@ -269,7 +272,8 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
       // 2. Transfer file seal signal without mod, which means the file is 
transferred completely
       if (!send(
           socket,
-          PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), 
tsFile.length()))) {
+          compressIfNeeded(
+              PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), 
tsFile.length())))) {
         receiverStatusHandler.handle(
             new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
                 .setMessage(errorMessage),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
index 74375cff112..d4e6706a5a4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
@@ -114,13 +114,14 @@ public class IoTDBSchemaRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnect
     // 2. Transfer file seal signal, which means the snapshots is transferred 
completely
     if (!send(
         socket,
-        PipeTransferSchemaSnapshotSealReq.toTPipeTransferBytes(
-            mtreeSnapshotFile.getName(),
-            mtreeSnapshotFile.length(),
-            Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.getName() 
: null,
-            Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.length() 
: 0,
-            pipeSchemaRegionSnapshotEvent.getDatabaseName(),
-            pipeSchemaRegionSnapshotEvent.toSealTypeString()))) {
+        compressIfNeeded(
+            PipeTransferSchemaSnapshotSealReq.toTPipeTransferBytes(
+                mtreeSnapshotFile.getName(),
+                mtreeSnapshotFile.length(),
+                Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.getName() : null,
+                Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.length() : 0,
+                pipeSchemaRegionSnapshotEvent.getDatabaseName(),
+                pipeSchemaRegionSnapshotEvent.toSealTypeString())))) {
       final String errorMessage =
           String.format(
               "Seal schema region snapshot file %s and %s error. Socket %s.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 74cbfe2e39b..c9f076b0f04 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -185,10 +185,11 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
         final InsertNode insertNode =
             
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
         final TPipeTransferReq pipeTransferReq =
-            Objects.isNull(insertNode)
-                ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
-                    pipeInsertNodeTabletInsertionEvent.getByteBuffer())
-                : 
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode);
+            compressIfNeeded(
+                Objects.isNull(insertNode)
+                    ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
+                        pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+                    : 
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
         final PipeTransferTabletInsertNodeEventHandler 
pipeTransferInsertNodeReqHandler =
             new PipeTransferTabletInsertNodeEventHandler(
                 pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
@@ -207,10 +208,11 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
           return;
         }
 
-        final PipeTransferTabletRawReq pipeTransferTabletRawReq =
-            PipeTransferTabletRawReq.toTPipeTransferReq(
-                pipeRawTabletInsertionEvent.convertToTablet(),
-                pipeRawTabletInsertionEvent.isAligned());
+        final TPipeTransferReq pipeTransferTabletRawReq =
+            compressIfNeeded(
+                PipeTransferTabletRawReq.toTPipeTransferReq(
+                    pipeRawTabletInsertionEvent.convertToTablet(),
+                    pipeRawTabletInsertionEvent.isAligned()));
         final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
             new PipeTransferTabletRawEventHandler(
                 pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 500ed4cab98..88e41343950 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
@@ -59,7 +60,11 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
     // Deep copy to keep Ids' and events' reference
     requestCommitIds = batch.deepCopyRequestCommitIds();
     events = batch.deepCopyEvents();
-    req = batch.toTPipeTransferReq();
+    req =
+        connector.isRpcCompressionEnabled()
+            ? batch.toTPipeTransferReq()
+            : PipeTransferCompressedReq.toTPipeTransferReq(
+                batch.toTPipeTransferReq(), connector.getCompressors());
 
     this.connector = connector;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index aebff325920..07669de76bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
 import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -31,6 +32,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.thrift.TException;
@@ -134,12 +136,19 @@ public class PipeTransferTsFileInsertionEventHandler
         readLength == readFileBufferSize
             ? readBuffer
             : Arrays.copyOfRange(readBuffer, 0, readLength);
+    final TPipeTransferReq uncompressedReq =
+        PipeTransferCompressedReq.toTPipeTransferReq(
+            transferMod
+                ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(
+                    currentFile.getName(), position, payload)
+                : PipeTransferTsFilePieceReq.toTPipeTransferReq(
+                    currentFile.getName(), position, payload),
+            connector.getCompressors());
     client.pipeTransfer(
-        transferMod
-            ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(
-                currentFile.getName(), position, payload)
-            : PipeTransferTsFilePieceReq.toTPipeTransferReq(
-                currentFile.getName(), position, payload),
+        connector.isRpcCompressionEnabled()
+            ? PipeTransferCompressedReq.toTPipeTransferReq(
+                uncompressedReq, connector.getCompressors())
+            : uncompressedReq,
         this);
     position += readLength;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 378e6f74281..33c97ce27f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -117,8 +117,9 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
           clientAndStatus
               .getLeft()
               .pipeTransfer(
-                  PipeTransferPlanNodeReq.toTPipeTransferReq(
-                      pipeSchemaRegionWritePlanEvent.getPlanNode()));
+                  compressIfNeeded(
+                      PipeTransferPlanNodeReq.toTPipeTransferReq(
+                          pipeSchemaRegionWritePlanEvent.getPlanNode())));
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 568baa13144..cb93bffd411 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -176,7 +176,10 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
     final TPipeTransferResp resp;
     try {
-      resp = 
clientAndStatus.getLeft().pipeTransfer(batchToTransfer.toTPipeTransferReq());
+      resp =
+          clientAndStatus
+              .getLeft()
+              
.pipeTransfer(compressIfNeeded(batchToTransfer.toTPipeTransferReq()));
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
@@ -239,15 +242,18 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
         resp =
             clientAndStatus
                 .getLeft()
-                
.pipeTransfer(PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
+                .pipeTransfer(
+                    compressIfNeeded(
+                        
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode)));
       } else {
         clientAndStatus = clientManager.getClient();
         resp =
             clientAndStatus
                 .getLeft()
                 .pipeTransfer(
-                    PipeTransferTabletBinaryReq.toTPipeTransferReq(
-                        pipeInsertNodeTabletInsertionEvent.getByteBuffer()));
+                    compressIfNeeded(
+                        PipeTransferTabletBinaryReq.toTPipeTransferReq(
+                            
pipeInsertNodeTabletInsertionEvent.getByteBuffer())));
       }
     } catch (final Exception e) {
       if (clientAndStatus != null) {
@@ -303,9 +309,10 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
           clientAndStatus
               .getLeft()
               .pipeTransfer(
-                  PipeTransferTabletRawReq.toTPipeTransferReq(
-                      pipeRawTabletInsertionEvent.convertToTablet(),
-                      pipeRawTabletInsertionEvent.isAligned()));
+                  compressIfNeeded(
+                      PipeTransferTabletRawReq.toTPipeTransferReq(
+                          pipeRawTabletInsertionEvent.convertToTablet(),
+                          pipeRawTabletInsertionEvent.isAligned())));
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
@@ -364,8 +371,12 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
             clientAndStatus
                 .getLeft()
                 .pipeTransfer(
-                    PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
-                        modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length()));
+                    compressIfNeeded(
+                        PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+                            modFile.getName(),
+                            modFile.length(),
+                            tsFile.getName(),
+                            tsFile.length())));
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
         clientManager.adjustTimeoutIfNecessary(e);
@@ -381,8 +392,9 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
             clientAndStatus
                 .getLeft()
                 .pipeTransfer(
-                    PipeTransferTsFileSealReq.toTPipeTransferReq(
-                        tsFile.getName(), tsFile.length()));
+                    compressIfNeeded(
+                        PipeTransferTsFileSealReq.toTPipeTransferReq(
+                            tsFile.getName(), tsFile.length())));
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
         clientManager.adjustTimeoutIfNecessary(e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
index 00ae9861dcf..667050320d0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
@@ -103,13 +103,14 @@ public class IoTDBSchemaRegionConnector extends 
IoTDBDataNodeSyncConnector {
           clientAndStatus
               .getLeft()
               .pipeTransfer(
-                  PipeTransferSchemaSnapshotSealReq.toTPipeTransferReq(
-                      mTreeSnapshotFile.getName(),
-                      mTreeSnapshotFile.length(),
-                      Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.getName() : null,
-                      Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.length() : 0,
-                      snapshotEvent.getDatabaseName(),
-                      snapshotEvent.toSealTypeString()));
+                  compressIfNeeded(
+                      PipeTransferSchemaSnapshotSealReq.toTPipeTransferReq(
+                          mTreeSnapshotFile.getName(),
+                          mTreeSnapshotFile.length(),
+                          Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.getName() : null,
+                          Objects.nonNull(tagLogSnapshotFile) ? 
tagLogSnapshotFile.length() : 0,
+                          snapshotEvent.getDatabaseName(),
+                          snapshotEvent.toSealTypeString())));
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index c5f1d5fa99d..dfd8be444b6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
@@ -188,6 +189,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             // Config requests will first be received by the DataNode receiver,
             // then transferred to ConfigNode receiver to execute.
             return handleTransferConfigPlan(req);
+          case TRANSFER_COMPRESSED:
+            return 
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
           default:
             break;
         }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index bb9d075910d..331c4d79a2f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -155,6 +155,24 @@ public class PipeConnectorConstant {
                   CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY,
                   CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY)));
 
+  public static final String CONNECTOR_COMPRESSOR_KEY = "connector.compressor";
+  public static final String SINK_COMPRESSOR_KEY = "sink.compressor";
+  public static final String CONNECTOR_COMPRESSOR_DEFAULT_VALUE = "";
+  public static final String CONNECTOR_COMPRESSOR_SNAPPY = "snappy";
+  public static final String CONNECTOR_COMPRESSOR_GZIP = "gzip";
+  public static final String CONNECTOR_COMPRESSOR_LZ4 = "lz4";
+  public static final String CONNECTOR_COMPRESSOR_ZSTD = "zstd";
+  public static final String CONNECTOR_COMPRESSOR_LZMA2 = "lzma2";
+  public static final Set<String> CONNECTOR_COMPRESSOR_SET =
+      Collections.unmodifiableSet(
+          new HashSet<>(
+              Arrays.asList(
+                  CONNECTOR_COMPRESSOR_SNAPPY,
+                  CONNECTOR_COMPRESSOR_GZIP,
+                  CONNECTOR_COMPRESSOR_LZ4,
+                  CONNECTOR_COMPRESSOR_ZSTD,
+                  CONNECTOR_COMPRESSOR_LZMA2)));
+
   public static final String SINK_TOPIC_KEY = "sink.topic";
   public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group";
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
new file mode 100644
index 00000000000..4d5113e4c20
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import java.io.IOException;
+
+public abstract class PipeCompressor {
+
+  public enum PipeCompressionType {
+    SNAPPY((byte) 0),
+    GZIP((byte) 1),
+    LZ4((byte) 2),
+    ZSTD((byte) 3),
+    LZMA2((byte) 4);
+
+    final byte index;
+
+    PipeCompressionType(byte index) {
+      this.index = index;
+    }
+
+    public byte getIndex() {
+      return index;
+    }
+  }
+
+  private final PipeCompressionType compressionType;
+
+  protected PipeCompressor(PipeCompressionType compressionType) {
+    this.compressionType = compressionType;
+  }
+
+  public abstract byte[] compress(byte[] data) throws IOException;
+
+  public abstract byte[] decompress(byte[] byteArray) throws IOException;
+
+  public byte serialize() {
+    return compressionType.getIndex();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
new file mode 100644
index 00000000000..14ae972fc4f
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_GZIP;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZ4;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZMA2;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_SNAPPY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD;
+
+public class PipeCompressorFactory {
+
+  private static Map<String, PipeCompressor> COMPRESSOR_NAME_TO_INSTANCE = new 
HashMap<>();
+
+  static {
+    COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_SNAPPY, new 
PipeSnappyCompressor());
+    COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_GZIP, new 
PipeGZIPCompressor());
+    COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_LZ4, new 
PipeLZ4Compressor());
+    COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_ZSTD, new 
PipeZSTDCompressor());
+    COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_LZMA2, new 
PipeLZMA2Compressor());
+    COMPRESSOR_NAME_TO_INSTANCE = 
Collections.unmodifiableMap(COMPRESSOR_NAME_TO_INSTANCE);
+  }
+
+  public static PipeCompressor getCompressor(String name) {
+    final PipeCompressor compressor = COMPRESSOR_NAME_TO_INSTANCE.get(name);
+    if (compressor == null) {
+      throw new UnsupportedOperationException("PipeCompressor not found for 
name: " + name);
+    }
+    return compressor;
+  }
+
+  private static Map<Byte, PipeCompressor> COMPRESSOR_INDEX_TO_INSTANCE = new 
HashMap<>();
+
+  static {
+    COMPRESSOR_INDEX_TO_INSTANCE.put(
+        PipeCompressor.PipeCompressionType.SNAPPY.getIndex(), new 
PipeSnappyCompressor());
+    COMPRESSOR_INDEX_TO_INSTANCE.put(
+        PipeCompressor.PipeCompressionType.GZIP.getIndex(), new 
PipeGZIPCompressor());
+    COMPRESSOR_INDEX_TO_INSTANCE.put(
+        PipeCompressor.PipeCompressionType.LZ4.getIndex(), new 
PipeLZ4Compressor());
+    COMPRESSOR_INDEX_TO_INSTANCE.put(
+        PipeCompressor.PipeCompressionType.ZSTD.getIndex(), new 
PipeZSTDCompressor());
+    COMPRESSOR_INDEX_TO_INSTANCE.put(
+        PipeCompressor.PipeCompressionType.LZMA2.getIndex(), new 
PipeLZMA2Compressor());
+    COMPRESSOR_INDEX_TO_INSTANCE = 
Collections.unmodifiableMap(COMPRESSOR_INDEX_TO_INSTANCE);
+  }
+
+  public static PipeCompressor getCompressor(byte index) {
+    final PipeCompressor compressor = COMPRESSOR_INDEX_TO_INSTANCE.get(index);
+    if (compressor == null) {
+      throw new UnsupportedOperationException("PipeCompressor not found for 
index: " + index);
+    }
+    return compressor;
+  }
+
+  private PipeCompressorFactory() {
+    // Empty constructor
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
new file mode 100644
index 00000000000..2cc1887555c
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+
+import java.io.IOException;
+
+public class PipeGZIPCompressor extends PipeCompressor {
+
+  private static final ICompressor COMPRESSOR = 
ICompressor.getCompressor(CompressionType.GZIP);
+  private static final IUnCompressor DECOMPRESSOR =
+      IUnCompressor.getUnCompressor(CompressionType.GZIP);
+
+  public PipeGZIPCompressor() {
+    super(PipeCompressionType.GZIP);
+  }
+
+  @Override
+  public byte[] compress(byte[] data) throws IOException {
+    return COMPRESSOR.compress(data);
+  }
+
+  @Override
+  public byte[] decompress(byte[] byteArray) throws IOException {
+    return DECOMPRESSOR.uncompress(byteArray);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
new file mode 100644
index 00000000000..5d6db90e661
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+
+import java.io.IOException;
+
+public class PipeLZ4Compressor extends PipeCompressor {
+
+  private static final ICompressor COMPRESSOR = 
ICompressor.getCompressor(CompressionType.LZ4);
+  private static final IUnCompressor DECOMPRESSOR =
+      IUnCompressor.getUnCompressor(CompressionType.LZ4);
+
+  public PipeLZ4Compressor() {
+    super(PipeCompressionType.LZ4);
+  }
+
+  @Override
+  public byte[] compress(byte[] data) throws IOException {
+    return COMPRESSOR.compress(data);
+  }
+
+  @Override
+  public byte[] decompress(byte[] byteArray) throws IOException {
+    return DECOMPRESSOR.uncompress(byteArray);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
new file mode 100644
index 00000000000..9e7c0a8b8d3
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+
+import java.io.IOException;
+
+public class PipeLZMA2Compressor extends PipeCompressor {
+
+  private static final ICompressor COMPRESSOR = 
ICompressor.getCompressor(CompressionType.LZMA2);
+  private static final IUnCompressor DECOMPRESSOR =
+      IUnCompressor.getUnCompressor(CompressionType.LZMA2);
+
+  public PipeLZMA2Compressor() {
+    super(PipeCompressionType.LZMA2);
+  }
+
+  @Override
+  public byte[] compress(byte[] data) throws IOException {
+    return COMPRESSOR.compress(data);
+  }
+
+  @Override
+  public byte[] decompress(byte[] byteArray) throws IOException {
+    return DECOMPRESSOR.uncompress(byteArray);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
new file mode 100644
index 00000000000..72333c1e929
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+
+import java.io.IOException;
+
+public class PipeSnappyCompressor extends PipeCompressor {
+
+  private static final ICompressor COMPRESSOR = 
ICompressor.getCompressor(CompressionType.SNAPPY);
+  private static final IUnCompressor DECOMPRESSOR =
+      IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
+
+  public PipeSnappyCompressor() {
+    super(PipeCompressionType.SNAPPY);
+  }
+
+  @Override
+  public byte[] compress(byte[] data) throws IOException {
+    return COMPRESSOR.compress(data);
+  }
+
+  @Override
+  public byte[] decompress(byte[] byteArray) throws IOException {
+    return DECOMPRESSOR.uncompress(byteArray);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
new file mode 100644
index 00000000000..72782353d68
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.compressor;
+
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+
+import java.io.IOException;
+
+public class PipeZSTDCompressor extends PipeCompressor {
+
+  private static final ICompressor COMPRESSOR = 
ICompressor.getCompressor(CompressionType.ZSTD);
+  private static final IUnCompressor DECOMPRESSOR =
+      IUnCompressor.getUnCompressor(CompressionType.ZSTD);
+
+  public PipeZSTDCompressor() {
+    super(PipeCompressionType.ZSTD);
+  }
+
+  @Override
+  public byte[] compress(byte[] data) throws IOException {
+    return COMPRESSOR.compress(data);
+  }
+
+  @Override
+  public byte[] decompress(byte[] byteArray) throws IOException {
+    return DECOMPRESSOR.uncompress(byteArray);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
index cbe7b395d1e..003c8b9afb3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
@@ -50,6 +50,9 @@ public enum PipeRequestType {
   TRANSFER_CONFIG_PLAN((short) 200),
   TRANSFER_CONFIG_SNAPSHOT_PIECE((short) 201),
   TRANSFER_CONFIG_SNAPSHOT_SEAL((short) 202),
+
+  // RPC Compression
+  TRANSFER_COMPRESSED((short) 300),
   ;
 
   private final short type;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
new file mode 100644
index 00000000000..f5e114949bd
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+
+import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
+import 
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class PipeTransferCompressedReq extends TPipeTransferReq {
+
+  /** Generate a compressed req with provided compressors. */
+  public static TPipeTransferReq toTPipeTransferReq(
+      final TPipeTransferReq originalReq, final List<PipeCompressor> 
compressors)
+      throws IOException {
+    // The generated PipeTransferCompressedReq consists of:
+    // version
+    // type: TRANSFER_COMPRESSED
+    // body:
+    //   (byte) count of compressors
+    //   (bytes) 1 byte for each compressor
+    //   compressed req:
+    //     (byte) version
+    //     (2 bytes) type
+    //     (bytes) body
+    final PipeTransferCompressedReq compressedReq = new 
PipeTransferCompressedReq();
+    compressedReq.version = 
IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    compressedReq.type = PipeRequestType.TRANSFER_COMPRESSED.getType();
+
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
+      for (final PipeCompressor compressor : compressors) {
+        ReadWriteIOUtils.write(compressor.serialize(), outputStream);
+      }
+
+      byte[] body =
+          BytesUtils.concatByteArrayList(
+              Arrays.asList(
+                  new byte[] {originalReq.version},
+                  BytesUtils.shortToBytes(originalReq.type),
+                  originalReq.getBody()));
+      for (final PipeCompressor compressor : compressors) {
+        body = compressor.compress(body);
+      }
+      outputStream.write(body);
+
+      compressedReq.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+    return compressedReq;
+  }
+
+  /** Get the original req from a compressed req. */
+  public static TPipeTransferReq fromTPipeTransferReq(final TPipeTransferReq 
transferReq)
+      throws IOException {
+    final ByteBuffer compressedBuffer = transferReq.body;
+
+    final List<PipeCompressor> compressors = new ArrayList<>();
+    final int compressorsSize = ReadWriteIOUtils.readByte(compressedBuffer);
+    for (int i = 0; i < compressorsSize; ++i) {
+      compressors.add(
+          
PipeCompressorFactory.getCompressor(ReadWriteIOUtils.readByte(compressedBuffer)));
+    }
+
+    byte[] body = new byte[compressedBuffer.remaining()];
+    compressedBuffer.get(body);
+
+    for (int i = compressors.size() - 1; i >= 0; --i) {
+      body = compressors.get(i).decompress(body);
+    }
+
+    final ByteBuffer decompressedBuffer = ByteBuffer.wrap(body);
+
+    final TPipeTransferReq decompressedReq = new TPipeTransferReq();
+    decompressedReq.version = ReadWriteIOUtils.readByte(decompressedBuffer);
+    decompressedReq.type = ReadWriteIOUtils.readShort(decompressedBuffer);
+    decompressedReq.body = decompressedBuffer.slice();
+
+    return decompressedReq;
+  }
+
+  /**
+   * For air-gap connectors. Generate the bytes of a compressed req from the 
bytes of original req.
+   */
+  public static byte[] toTPipeTransferReqBytes(
+      final byte[] rawReqInBytes, final List<PipeCompressor> compressors) 
throws IOException {
+    // The generated bytes consists of:
+    // (byte) version
+    // (2 bytes) type: TRANSFER_COMPRESSED
+    // (byte) count of compressors
+    // (bytes) 1 byte for each compressor
+    // compressed req:
+    //   (byte) version
+    //   (2 bytes) type
+    //   (bytes) body
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_COMPRESSED.getType(), 
outputStream);
+      ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
+      for (final PipeCompressor compressor : compressors) {
+        ReadWriteIOUtils.write(compressor.serialize(), outputStream);
+      }
+
+      byte[] compressedReq = rawReqInBytes;
+      for (final PipeCompressor compressor : compressors) {
+        compressedReq = compressor.compress(compressedReq);
+      }
+      outputStream.write(compressedReq);
+
+      return byteArrayOutputStream.toByteArray();
+    }
+  }
+
+  private PipeTransferCompressedReq() {
+    // Empty constructor
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 3034b581d5d..ec2d22ddfa6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -226,9 +226,10 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
                 : Arrays.copyOfRange(readBuffer, 0, readLength);
         if (!send(
             socket,
-            isMultiFile
-                ? getTransferMultiFilePieceBytes(file.getName(), position, 
payload)
-                : getTransferSingleFilePieceBytes(file.getName(), position, 
payload))) {
+            compressIfNeeded(
+                isMultiFile
+                    ? getTransferMultiFilePieceBytes(file.getName(), position, 
payload)
+                    : getTransferSingleFilePieceBytes(file.getName(), 
position, payload)))) {
           final String errorMessage =
               String.format("Transfer file %s error. Socket %s.", file, 
socket);
           if (mayNeedHandshakeWhenFail()) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 65b32a48396..751f0dec05f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -21,16 +21,21 @@ package org.apache.iotdb.commons.pipe.connector.protocol;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
+import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
+import 
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashSet;
@@ -38,6 +43,9 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_SET;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE;
@@ -57,6 +65,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
@@ -82,6 +91,9 @@ public abstract class IoTDBConnector implements PipeConnector 
{
 
   protected String loadBalanceStrategy;
 
+  protected boolean isRpcCompressionEnabled;
+  protected final List<PipeCompressor> compressors = new ArrayList<>();
+
   protected boolean isTabletBatchModeEnabled = true;
 
   protected PipeReceiverStatusHandler receiverStatusHandler;
@@ -127,6 +139,36 @@ public abstract class IoTDBConnector implements 
PipeConnector {
             CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
         loadBalanceStrategy);
 
+    final String compressionTypes =
+        parameters
+            .getStringOrDefault(
+                Arrays.asList(CONNECTOR_COMPRESSOR_KEY, SINK_COMPRESSOR_KEY),
+                CONNECTOR_COMPRESSOR_DEFAULT_VALUE)
+            .toLowerCase();
+    if (!compressionTypes.isEmpty()) {
+      for (final String compressionType : compressionTypes.split(",")) {
+        final String trimmedCompressionType = compressionType.trim();
+        if (trimmedCompressionType.isEmpty()) {
+          continue;
+        }
+
+        validator.validate(
+            arg -> CONNECTOR_COMPRESSOR_SET.contains(trimmedCompressionType),
+            String.format(
+                "Compressor should be one of %s, but got %s.",
+                CONNECTOR_COMPRESSOR_SET, trimmedCompressionType),
+            trimmedCompressionType);
+        
compressors.add(PipeCompressorFactory.getCompressor(trimmedCompressionType));
+      }
+    }
+    validator.validate(
+        arg -> compressors.size() <= Byte.MAX_VALUE,
+        String.format(
+            "The number of compressors should be less than or equal to %d, but 
got %d.",
+            Byte.MAX_VALUE, compressors.size()),
+        compressors.size());
+    isRpcCompressionEnabled = !compressors.isEmpty();
+
     validator.validate(
         arg -> arg.equals("retry") || arg.equals("ignore"),
         String.format(
@@ -260,6 +302,26 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     }
   }
 
+  protected TPipeTransferReq compressIfNeeded(TPipeTransferReq req) throws 
IOException {
+    return isRpcCompressionEnabled
+        ? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors)
+        : req;
+  }
+
+  protected byte[] compressIfNeeded(byte[] reqInBytes) throws IOException {
+    return isRpcCompressionEnabled
+        ? PipeTransferCompressedReq.toTPipeTransferReqBytes(reqInBytes, 
compressors)
+        : reqInBytes;
+  }
+
+  public boolean isRpcCompressionEnabled() {
+    return isRpcCompressionEnabled;
+  }
+
+  public List<PipeCompressor> getCompressors() {
+    return compressors;
+  }
+
   public PipeReceiverStatusHandler statusHandler() {
     return receiverStatusHandler;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index e366584fe8f..4e1ab4d4789 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -166,9 +166,11 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
                   clientAndStatus
                       .getLeft()
                       .pipeTransfer(
-                          isMultiFile
-                              ? getTransferMultiFilePieceReq(file.getName(), 
position, payLoad)
-                              : getTransferSingleFilePieceReq(file.getName(), 
position, payLoad)));
+                          compressIfNeeded(
+                              isMultiFile
+                                  ? 
getTransferMultiFilePieceReq(file.getName(), position, payLoad)
+                                  : getTransferSingleFilePieceReq(
+                                      file.getName(), position, payLoad))));
         } catch (Exception e) {
           clientAndStatus.setRight(false);
           throw new PipeConnectionException(

Reply via email to