This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch kwz
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/kwz by this push:
     new f2785b784a0 air-gap-batch
f2785b784a0 is described below

commit f2785b784a012b8c0434a30ba48a3d5eae5071c8
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 16:48:28 2026 +0800

    air-gap-batch
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |   4 +
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java | 331 ++++++++++++++++++---
 .../airgap/IoTDBDataRegionAirGapSinkTest.java      | 182 +++++++++++
 .../pipe/sink/protocol/IoTDBAirGapSink.java        |  17 +-
 4 files changed, 488 insertions(+), 46 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index a11a1a68f0c..205ce4c16bd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionSinkMetrics;
 import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
+import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBDataRegionAirGapSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
 import org.apache.iotdb.metrics.type.Histogram;
@@ -299,6 +300,9 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     if (outputPipeSink instanceof IoTDBDataRegionSyncSink) {
       return ((IoTDBDataRegionSyncSink) outputPipeSink).getBatchSize();
     }
+    if (outputPipeSink instanceof IoTDBDataRegionAirGapSink) {
+      return ((IoTDBDataRegionAirGapSink) outputPipeSink).getBatchSize();
+    }
     return 0;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 7f904324bbb..aeb2556792a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.airgap;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -30,6 +31,10 @@ import 
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
 import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
@@ -38,6 +43,7 @@ import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFil
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -48,13 +54,26 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.external.commons.io.FileUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.NoSuchFileException;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
@@ -67,6 +86,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionAirGapSink.class);
 
+  private PipeTransferBatchReqBuilder tabletBatchBuilder;
   private boolean enableSendTsFileLimit;
 
   @Override
@@ -75,6 +95,10 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
       throws Exception {
     super.customize(parameters, configuration);
 
+    if (isTabletBatchModeEnabled) {
+      tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
+    }
+
     enableSendTsFileLimit =
         parameters.getBooleanOrDefault(
             Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, 
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
@@ -101,7 +125,10 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
       // When receiver encountered packet loss, the transfer will time out
       // We need to restore the transfer quickly by retry under this 
circumstance
       socket.setSoTimeout(PIPE_CONFIG.getPipeAirGapSinkTabletTimeoutMs());
-      if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+      if (isTabletBatchModeEnabled) {
+        tabletBatchBuilder.onEvent(tabletInsertionEvent);
+        doTransferWrapper(socket);
+      } else if (tabletInsertionEvent instanceof 
PipeInsertNodeTabletInsertionEvent) {
         doTransferWrapper(socket, (PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
       } else {
         doTransferWrapper(socket, (PipeRawTabletInsertionEvent) 
tabletInsertionEvent);
@@ -140,6 +167,9 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
     final AirGapSocket socket = sockets.get(socketIndex);
 
     try {
+      if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
+        doTransferWrapper(socket);
+      }
       doTransferWrapper(socket, (PipeTsFileInsertionEvent) 
tsFileInsertionEvent);
     } catch (final IOException e) {
       isSocketAlive.set(socketIndex, false);
@@ -155,13 +185,33 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
 
   @Override
   public void transfer(final Event event) throws Exception {
+    if (event instanceof PipeDeleteDataNodeEvent) {
+      final int socketIndex = nextSocketIndex();
+      final AirGapSocket socket = sockets.get(socketIndex);
+
+      try {
+        doTransferWrapper(socket, (PipeDeleteDataNodeEvent) event);
+      } catch (final IOException e) {
+        isSocketAlive.set(socketIndex, false);
+
+        throw new PipeConnectionException(
+            String.format(
+                "Network error when transfer tsfile event %s, because %s.",
+                ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
+            e);
+      }
+      return;
+    }
+
     final int socketIndex = nextSocketIndex();
     final AirGapSocket socket = sockets.get(socketIndex);
 
     try {
-      if (event instanceof PipeDeleteDataNodeEvent) {
-        doTransferWrapper(socket, (PipeDeleteDataNodeEvent) event);
-      } else if (!(event instanceof PipeHeartbeatEvent || event instanceof 
PipeTerminateEvent)) {
+      if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
+        doTransferWrapper(socket);
+      }
+
+      if (!(event instanceof PipeHeartbeatEvent || event instanceof 
PipeTerminateEvent)) {
         LOGGER.warn(
             "IoTDBDataRegionAirGapConnector does not support transferring 
generic event: {}.",
             event);
@@ -177,6 +227,63 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
     }
   }
 
+  private void doTransferWrapper(final AirGapSocket socket)
+      throws IOException, WriteProcessException {
+    for (final Pair<?, PipeTabletEventBatch> nonEmptyAndShouldEmitBatch :
+        tabletBatchBuilder.getAllNonEmptyAndShouldEmitBatches()) {
+      final PipeTabletEventBatch batch = nonEmptyAndShouldEmitBatch.getRight();
+      if (batch instanceof PipeTabletEventPlainBatch) {
+        doTransfer(socket, (PipeTabletEventPlainBatch) batch);
+      } else if (batch instanceof PipeTabletEventTsFileBatch) {
+        doTransfer(socket, (PipeTabletEventTsFileBatch) batch);
+      } else {
+        LOGGER.warn("Unsupported batch type {}.", batch.getClass());
+      }
+      
batch.decreaseEventsReferenceCount(IoTDBDataRegionAirGapSink.class.getName(), 
true);
+      batch.onSuccess();
+    }
+  }
+
+  private void doTransfer(final AirGapSocket socket, final 
PipeTabletEventPlainBatch batchToTransfer)
+      throws IOException {
+    if (!sendBatch(
+        socket,
+        toTPipeTransferBytes(batchToTransfer.toTPipeTransferReq()),
+        batchToTransfer.getPipe2BytesAccumulated())) {
+      final String errorMessage =
+          String.format("Transfer PipeTransferTabletBatchReq error. Socket: 
%s.", socket);
+      receiverStatusHandler.handle(
+          new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+              .setMessage(errorMessage),
+          errorMessage,
+          batchToTransfer.deepCopyEvents().toString());
+    }
+  }
+
+  private void doTransfer(final AirGapSocket socket, final 
PipeTabletEventTsFileBatch batchToTransfer)
+      throws IOException, WriteProcessException {
+    final List<Pair<String, File>> dbTsFilePairs = 
batchToTransfer.sealTsFiles();
+    final Map<Pair<String, Long>, Double> pipe2WeightMap =
+        batchToTransfer.deepCopyPipe2WeightMap();
+
+    for (final Pair<String, File> dbTsFile : dbTsFilePairs) {
+      doTransfer(
+          pipe2WeightMap, socket, dbTsFile.right, null, dbTsFile.left, 
dbTsFile.right.getName());
+      try {
+        RetryUtils.retryOnException(
+            () -> {
+              FileUtils.delete(dbTsFile.right);
+              return null;
+            });
+      } catch (final NoSuchFileException e) {
+        LOGGER.info("The file {} is not found, may already be deleted.", 
dbTsFile);
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to delete batch file {}, this file should be deleted 
manually later", dbTsFile);
+      }
+    }
+  }
+
   private void doTransferWrapper(
       final AirGapSocket socket, final PipeDeleteDataNodeEvent 
pipeDeleteDataNodeEvent)
       throws PipeException, IOException {
@@ -319,61 +426,159 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
   private void doTransfer(
       final AirGapSocket socket, final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
       throws PipeException, IOException {
-    final String pipeName = pipeTsFileInsertionEvent.getPipeName();
-    final long creationTime = pipeTsFileInsertionEvent.getCreationTime();
-    final File tsFile = pipeTsFileInsertionEvent.getTsFile();
+    doTransfer(
+        Collections.singletonMap(
+            new Pair<>(
+                pipeTsFileInsertionEvent.getPipeName(), 
pipeTsFileInsertionEvent.getCreationTime()),
+            1.0),
+        socket,
+        pipeTsFileInsertionEvent.getTsFile(),
+        pipeTsFileInsertionEvent.isWithMod() && supportModsIfIsDataNodeReceiver
+            ? pipeTsFileInsertionEvent.getModFile()
+            : null,
+        pipeTsFileInsertionEvent.isTableModelEvent()
+            ? pipeTsFileInsertionEvent.getTableModelDatabaseName()
+            : null,
+        pipeTsFileInsertionEvent.toString());
+  }
+
+  private void doTransfer(
+      final Map<Pair<String, Long>, Double> pipe2WeightMap,
+      final AirGapSocket socket,
+      final File tsFile,
+      final File modFile,
+      final String dataBaseName,
+      final String receiverStatusContext)
+      throws PipeException, IOException {
     final String errorMessage = String.format("Seal file %s error. Socket 
%s.", tsFile, socket);
 
-    // 1. Transfer file piece by piece, and mod if needed
-    if (pipeTsFileInsertionEvent.isWithMod() && 
supportModsIfIsDataNodeReceiver) {
-      final File modFile = pipeTsFileInsertionEvent.getModFile();
-      transferFilePieces(pipeName, creationTime, modFile, socket, true);
-      transferFilePieces(pipeName, creationTime, tsFile, socket, true);
-      // 2. Transfer file seal signal with mod, which means the file is 
transferred completely
-      if (!send(
-          pipeName,
-          creationTime,
+    if (Objects.nonNull(modFile)) {
+      transferFilePieces(pipe2WeightMap, modFile, socket, true);
+      transferFilePieces(pipe2WeightMap, tsFile, socket, true);
+      if (!sendWeighted(
           socket,
           PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
-              modFile.getName(),
-              modFile.length(),
-              tsFile.getName(),
-              tsFile.length(),
-              pipeTsFileInsertionEvent.isTableModelEvent()
-                  ? pipeTsFileInsertionEvent.getTableModelDatabaseName()
-                  : null))) {
+              modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length(), dataBaseName),
+          pipe2WeightMap)) {
         receiverStatusHandler.handle(
             new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
                 .setMessage(errorMessage),
             errorMessage,
-            pipeTsFileInsertionEvent.toString());
+            receiverStatusContext);
       } else {
         LOGGER.info("Successfully transferred file {}.", tsFile);
       }
     } else {
-      transferFilePieces(pipeName, creationTime, tsFile, socket, false);
-      // 2. Transfer file seal signal without mod, which means the file is 
transferred completely
-      if (!send(
-          pipeName,
-          creationTime,
+      transferFilePieces(pipe2WeightMap, tsFile, socket, false);
+      if (!sendWeighted(
           socket,
           PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
-              tsFile.getName(),
-              tsFile.length(),
-              pipeTsFileInsertionEvent.isTableModelEvent()
-                  ? pipeTsFileInsertionEvent.getTableModelDatabaseName()
-                  : null))) {
+              tsFile.getName(), tsFile.length(), dataBaseName),
+          pipe2WeightMap)) {
         receiverStatusHandler.handle(
             new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
                 .setMessage(errorMessage),
             errorMessage,
-            pipeTsFileInsertionEvent.toString());
+            receiverStatusContext);
       } else {
         LOGGER.info("Successfully transferred file {}.", tsFile);
       }
     }
   }
 
+  private void transferFilePieces(
+      final Map<Pair<String, Long>, Double> pipe2WeightMap,
+      final File file,
+      final AirGapSocket socket,
+      final boolean isMultiFile)
+      throws PipeException, IOException {
+    final int readFileBufferSize = PIPE_CONFIG.getPipeSinkReadFileBufferSize();
+    final byte[] readBuffer = new byte[readFileBufferSize];
+    long position = 0;
+    try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
+      while (true) {
+        mayLimitRateAndRecordIO(readFileBufferSize);
+        final int readLength = reader.read(readBuffer);
+        if (readLength == -1) {
+          break;
+        }
+
+        final byte[] payload =
+            readLength == readFileBufferSize
+                ? readBuffer
+                : Arrays.copyOfRange(readBuffer, 0, readLength);
+        if (!sendWeighted(
+            socket,
+            isMultiFile
+                ? getTransferMultiFilePieceBytes(file.getName(), position, 
payload)
+                : getTransferSingleFilePieceBytes(file.getName(), position, 
payload),
+            pipe2WeightMap)) {
+          final String errorMessage =
+              String.format("Transfer file %s error. Socket %s.", file, 
socket);
+          receiverStatusHandler.handle(
+              new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+                  .setMessage(errorMessage),
+              errorMessage,
+              file.toString());
+        } else {
+          position += readLength;
+        }
+      }
+    }
+  }
+
+  private boolean sendBatch(
+      final AirGapSocket socket,
+      byte[] bytes,
+      final Map<Pair<String, Long>, Long> pipe2BytesAccumulated)
+      throws IOException {
+    final long uncompressedSize = bytes.length;
+    bytes = compressIfNeeded(bytes);
+
+    final double compressionRatio =
+        uncompressedSize == 0 ? 1 : (double) bytes.length / uncompressedSize;
+    for (final Map.Entry<Pair<String, Long>, Long> entry : 
pipe2BytesAccumulated.entrySet()) {
+      rateLimitIfNeeded(
+          entry.getKey().getLeft(),
+          entry.getKey().getRight(),
+          socket.getEndPoint(),
+          (long) (entry.getValue() * compressionRatio));
+    }
+    return sendBytes(socket, bytes);
+  }
+
+  private boolean sendWeighted(
+      final AirGapSocket socket,
+      byte[] bytes,
+      final Map<Pair<String, Long>, Double> pipe2WeightMap)
+      throws IOException {
+    bytes = compressIfNeeded(bytes);
+
+    for (final Map.Entry<Pair<String, Long>, Double> entry : 
pipe2WeightMap.entrySet()) {
+      rateLimitIfNeeded(
+          entry.getKey().getLeft(),
+          entry.getKey().getRight(),
+          socket.getEndPoint(),
+          (long) (bytes.length * entry.getValue()));
+    }
+    return sendBytes(socket, bytes);
+  }
+
+  private byte[] toTPipeTransferBytes(final TPipeTransferReq req) throws 
IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(req.version, outputStream);
+      ReadWriteIOUtils.write(req.type, outputStream);
+
+      final ByteBuffer bodyBuffer = req.body.duplicate();
+      final byte[] body = new byte[bodyBuffer.remaining()];
+      bodyBuffer.get(body);
+      outputStream.write(body);
+
+      return byteArrayOutputStream.toByteArray();
+    }
+  }
+
   @Override
   protected void mayLimitRateAndRecordIO(final long requiredBytes) {
     PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes);
@@ -402,4 +607,60 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
     }
     return super.compressIfNeeded(reqInBytes);
   }
+
+  @Override
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    if (Objects.nonNull(tabletBatchBuilder)) {
+      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
+    }
+  }
+
+  public int getBatchSize() {
+    return Objects.nonNull(tabletBatchBuilder) ? tabletBatchBuilder.size() : 0;
+  }
+
+  @Override
+  public void close() {
+    if (tabletBatchBuilder != null) {
+      tabletBatchBuilder.close();
+    }
+
+    super.close();
+  }
+
+  @Override
+  public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+    if (tabletBatchBuilder != null) {
+      tabletBatchBuilder.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+    }
+  }
+
+  @Override
+  public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+    if (tabletBatchBuilder != null) {
+      tabletBatchBuilder.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+    }
+  }
+
+  @Override
+  public void setTabletBatchTimeIntervalHistogram(Histogram 
tabletBatchTimeIntervalHistogram) {
+    if (tabletBatchBuilder != null) {
+      
tabletBatchBuilder.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+    }
+  }
+
+  @Override
+  public void setTsFileBatchTimeIntervalHistogram(Histogram 
tsFileBatchTimeIntervalHistogram) {
+    if (tabletBatchBuilder != null) {
+      
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+    }
+  }
+
+  @Override
+  public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) {
+    if (tabletBatchBuilder != null) {
+      tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java
new file mode 100644
index 00000000000..a2d5de56ca5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.airgap;
+
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
+import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class IoTDBDataRegionAirGapSinkTest {
+
+  @Test
+  public void testTransferTabletBatchOverAirGap() throws Exception {
+    try (final RecordingIoTDBDataRegionAirGapSink sink = new 
RecordingIoTDBDataRegionAirGapSink()) {
+      final PipeParameters parameters = buildParameters(false);
+      sink.validate(new PipeParameterValidator(parameters));
+      sink.customize(
+          parameters,
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1)));
+      sink.prepareSocket();
+
+      sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 1L));
+      sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 2L));
+
+      Thread.sleep(300L);
+      sink.transfer(new PipeHeartbeatEvent(-1, false));
+
+      Assert.assertEquals(1, sink.sentRequests.size());
+
+      final TPipeTransferReq req = 
toTPipeTransferReq(sink.sentRequests.get(0));
+      Assert.assertEquals(PipeRequestType.TRANSFER_TABLET_BATCH_V2.getType(), 
req.type);
+
+      final PipeTransferTabletBatchReqV2 batchReq =
+          PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req);
+      Assert.assertEquals(2, batchReq.getTabletReqs().size());
+    }
+  }
+
+  @Test
+  public void testTransferTsFileBatchOverAirGap() throws Exception {
+    try (final RecordingIoTDBDataRegionAirGapSink sink = new 
RecordingIoTDBDataRegionAirGapSink()) {
+      final PipeParameters parameters = buildParameters(true);
+      sink.validate(new PipeParameterValidator(parameters));
+      sink.customize(
+          parameters,
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1)));
+      sink.prepareSocket();
+
+      sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 1L));
+      sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 2L));
+
+      Thread.sleep(300L);
+      sink.transfer(new PipeHeartbeatEvent(-1, false));
+
+      final List<Short> requestTypes = new ArrayList<>();
+      for (final byte[] requestBytes : sink.sentRequests) {
+        requestTypes.add(toTPipeTransferReq(requestBytes).type);
+      }
+
+      
Assert.assertTrue(requestTypes.contains(PipeRequestType.TRANSFER_TS_FILE_PIECE.getType()));
+      Assert.assertTrue(
+          
requestTypes.contains(PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD.getType()));
+      
Assert.assertFalse(requestTypes.contains(PipeRequestType.TRANSFER_TABLET_RAW_V2.getType()));
+      Assert.assertFalse(
+          
requestTypes.contains(PipeRequestType.TRANSFER_TABLET_BATCH_V2.getType()));
+    }
+  }
+
+  private PipeParameters buildParameters(final boolean useTsFileBatch) {
+    final Map<String, String> attributes = new HashMap<>();
+    attributes.put(PipeSinkConstant.CONNECTOR_KEY, 
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
+    attributes.put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, 
"127.0.0.1:6668");
+    attributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, "200");
+    attributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1048576");
+    if (useTsFileBatch) {
+      attributes.put(PipeSinkConstant.CONNECTOR_FORMAT_KEY, "tsfile");
+    }
+    return new PipeParameters(attributes);
+  }
+
+  private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent(
+      final String pipeName, final long creationTime, final long value) {
+    final List<IMeasurementSchema> schemaList =
+        Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64));
+    final Tablet tablet = new Tablet("root.db.d1", schemaList, 1);
+    tablet.addTimestamp(0, value);
+    tablet.addValue("s1", 0, value);
+    return new PipeRawTabletInsertionEvent(
+        false,
+        "root.db",
+        "db",
+        "root.db",
+        tablet,
+        false,
+        pipeName,
+        creationTime,
+        null,
+        null,
+        false);
+  }
+
+  private static TPipeTransferReq toTPipeTransferReq(final byte[] 
requestBytes) {
+    final ByteBuffer buffer = ByteBuffer.wrap(requestBytes);
+
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = ReadWriteIOUtils.readByte(buffer);
+    req.type = ReadWriteIOUtils.readShort(buffer);
+    req.body = buffer.slice();
+    return req;
+  }
+
+  private static class RecordingIoTDBDataRegionAirGapSink extends 
IoTDBDataRegionAirGapSink {
+
+    private final List<byte[]> sentRequests = new ArrayList<>();
+
+    private void prepareSocket() {
+      sockets.set(0, new TestingAirGapSocket());
+    }
+
+    @Override
+    protected int nextSocketIndex() {
+      return 0;
+    }
+
+    @Override
+    protected boolean sendBytes(final AirGapSocket socket, final byte[] bytes) 
{
+      sentRequests.add(Arrays.copyOf(bytes, bytes.length));
+      return true;
+    }
+
+    private static class TestingAirGapSocket extends AirGapSocket {
+
+      private TestingAirGapSocket() {
+        super("127.0.0.1", 6668);
+      }
+
+      @Override
+      public synchronized void setSoTimeout(final int timeout) {
+        // No-op for unit test.
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index 7d84e3bb98f..01b15d80432 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -109,13 +109,6 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
       throws Exception {
     super.customize(parameters, configuration);
 
-    if (isTabletBatchModeEnabled) {
-      LOGGER.warn(
-          "Batch mode is enabled by the given parameters. "
-              + "IoTDBAirGapConnector does not support batch mode. "
-              + "Disable batch mode.");
-    }
-
     for (int i = 0; i < nodeUrls.size(); i++) {
       isSocketAlive.add(false);
       sockets.add(null);
@@ -327,15 +320,17 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
   protected boolean send(
       final String pipeName, final long creationTime, final AirGapSocket 
socket, byte[] bytes)
       throws IOException {
+    bytes = compressIfNeeded(bytes);
+    rateLimitIfNeeded(pipeName, creationTime, socket.getEndPoint(), 
bytes.length);
+    return sendBytes(socket, bytes);
+  }
+
+  protected boolean sendBytes(final AirGapSocket socket, byte[] bytes) throws 
IOException {
     if (!socket.isConnected()) {
       throw new SocketException(
           String.format("Socket %s is closed, will try to handshake", socket));
     }
 
-    bytes = compressIfNeeded(bytes);
-
-    rateLimitIfNeeded(pipeName, creationTime, socket.getEndPoint(), 
bytes.length);
-
     final BufferedOutputStream outputStream = new 
BufferedOutputStream(socket.getOutputStream());
     bytes = enrichWithLengthAndChecksum(bytes);
     outputStream.write(eLanguageEnable ? enrichWithELanguage(bytes) : bytes);

Reply via email to