This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch kwz
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/kwz by this push:
new f2785b784a0 air-gap-batch
f2785b784a0 is described below
commit f2785b784a012b8c0434a30ba48a3d5eae5071c8
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 16:48:28 2026 +0800
air-gap-batch
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 4 +
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 331 ++++++++++++++++++---
.../airgap/IoTDBDataRegionAirGapSinkTest.java | 182 +++++++++++
.../pipe/sink/protocol/IoTDBAirGapSink.java | 17 +-
4 files changed, 488 insertions(+), 46 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index a11a1a68f0c..205ce4c16bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionSinkMetrics;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
+import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBDataRegionAirGapSink;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
import org.apache.iotdb.metrics.type.Histogram;
@@ -299,6 +300,9 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
if (outputPipeSink instanceof IoTDBDataRegionSyncSink) {
return ((IoTDBDataRegionSyncSink) outputPipeSink).getBatchSize();
}
+ if (outputPipeSink instanceof IoTDBDataRegionAirGapSink) {
+ return ((IoTDBDataRegionAirGapSink) outputPipeSink).getBatchSize();
+ }
return 0;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 7f904324bbb..aeb2556792a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.airgap;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -30,6 +31,10 @@ import
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch;
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
@@ -38,6 +43,7 @@ import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFil
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -48,13 +54,26 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.external.commons.io.FileUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.NoSuchFileException;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
@@ -67,6 +86,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataRegionAirGapSink.class);
+ private PipeTransferBatchReqBuilder tabletBatchBuilder;
private boolean enableSendTsFileLimit;
@Override
@@ -75,6 +95,10 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
throws Exception {
super.customize(parameters, configuration);
+ if (isTabletBatchModeEnabled) {
+ tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
+ }
+
enableSendTsFileLimit =
parameters.getBooleanOrDefault(
Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT,
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
@@ -101,7 +125,10 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
// When receiver encountered packet loss, the transfer will time out
// We need to restore the transfer quickly by retry under this
circumstance
socket.setSoTimeout(PIPE_CONFIG.getPipeAirGapSinkTabletTimeoutMs());
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ if (isTabletBatchModeEnabled) {
+ tabletBatchBuilder.onEvent(tabletInsertionEvent);
+ doTransferWrapper(socket);
+ } else if (tabletInsertionEvent instanceof
PipeInsertNodeTabletInsertionEvent) {
doTransferWrapper(socket, (PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
} else {
doTransferWrapper(socket, (PipeRawTabletInsertionEvent)
tabletInsertionEvent);
@@ -140,6 +167,9 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
final AirGapSocket socket = sockets.get(socketIndex);
try {
+ if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
+ doTransferWrapper(socket);
+ }
doTransferWrapper(socket, (PipeTsFileInsertionEvent)
tsFileInsertionEvent);
} catch (final IOException e) {
isSocketAlive.set(socketIndex, false);
@@ -155,13 +185,33 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
@Override
public void transfer(final Event event) throws Exception {
+ if (event instanceof PipeDeleteDataNodeEvent) {
+ final int socketIndex = nextSocketIndex();
+ final AirGapSocket socket = sockets.get(socketIndex);
+
+ try {
+ doTransferWrapper(socket, (PipeDeleteDataNodeEvent) event);
+ } catch (final IOException e) {
+ isSocketAlive.set(socketIndex, false);
+
+ throw new PipeConnectionException(
+ String.format(
+ "Network error when transfer tsfile event %s, because %s.",
+ ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
+ e);
+ }
+ return;
+ }
+
final int socketIndex = nextSocketIndex();
final AirGapSocket socket = sockets.get(socketIndex);
try {
- if (event instanceof PipeDeleteDataNodeEvent) {
- doTransferWrapper(socket, (PipeDeleteDataNodeEvent) event);
- } else if (!(event instanceof PipeHeartbeatEvent || event instanceof
PipeTerminateEvent)) {
+ if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
+ doTransferWrapper(socket);
+ }
+
+ if (!(event instanceof PipeHeartbeatEvent || event instanceof
PipeTerminateEvent)) {
LOGGER.warn(
"IoTDBDataRegionAirGapConnector does not support transferring
generic event: {}.",
event);
@@ -177,6 +227,63 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
}
}
+ private void doTransferWrapper(final AirGapSocket socket)
+ throws IOException, WriteProcessException {
+ for (final Pair<?, PipeTabletEventBatch> nonEmptyAndShouldEmitBatch :
+ tabletBatchBuilder.getAllNonEmptyAndShouldEmitBatches()) {
+ final PipeTabletEventBatch batch = nonEmptyAndShouldEmitBatch.getRight();
+ if (batch instanceof PipeTabletEventPlainBatch) {
+ doTransfer(socket, (PipeTabletEventPlainBatch) batch);
+ } else if (batch instanceof PipeTabletEventTsFileBatch) {
+ doTransfer(socket, (PipeTabletEventTsFileBatch) batch);
+ } else {
+ LOGGER.warn("Unsupported batch type {}.", batch.getClass());
+ }
+
batch.decreaseEventsReferenceCount(IoTDBDataRegionAirGapSink.class.getName(),
true);
+ batch.onSuccess();
+ }
+ }
+
+ private void doTransfer(final AirGapSocket socket, final
PipeTabletEventPlainBatch batchToTransfer)
+ throws IOException {
+ if (!sendBatch(
+ socket,
+ toTPipeTransferBytes(batchToTransfer.toTPipeTransferReq()),
+ batchToTransfer.getPipe2BytesAccumulated())) {
+ final String errorMessage =
+ String.format("Transfer PipeTransferTabletBatchReq error. Socket:
%s.", socket);
+ receiverStatusHandler.handle(
+ new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(errorMessage),
+ errorMessage,
+ batchToTransfer.deepCopyEvents().toString());
+ }
+ }
+
+ private void doTransfer(final AirGapSocket socket, final
PipeTabletEventTsFileBatch batchToTransfer)
+ throws IOException, WriteProcessException {
+ final List<Pair<String, File>> dbTsFilePairs =
batchToTransfer.sealTsFiles();
+ final Map<Pair<String, Long>, Double> pipe2WeightMap =
+ batchToTransfer.deepCopyPipe2WeightMap();
+
+ for (final Pair<String, File> dbTsFile : dbTsFilePairs) {
+ doTransfer(
+ pipe2WeightMap, socket, dbTsFile.right, null, dbTsFile.left,
dbTsFile.right.getName());
+ try {
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.delete(dbTsFile.right);
+ return null;
+ });
+ } catch (final NoSuchFileException e) {
+ LOGGER.info("The file {} is not found, may already be deleted.",
dbTsFile);
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to delete batch file {}, this file should be deleted
manually later", dbTsFile);
+ }
+ }
+ }
+
private void doTransferWrapper(
final AirGapSocket socket, final PipeDeleteDataNodeEvent
pipeDeleteDataNodeEvent)
throws PipeException, IOException {
@@ -319,61 +426,159 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
private void doTransfer(
final AirGapSocket socket, final PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
throws PipeException, IOException {
- final String pipeName = pipeTsFileInsertionEvent.getPipeName();
- final long creationTime = pipeTsFileInsertionEvent.getCreationTime();
- final File tsFile = pipeTsFileInsertionEvent.getTsFile();
+ doTransfer(
+ Collections.singletonMap(
+ new Pair<>(
+ pipeTsFileInsertionEvent.getPipeName(),
pipeTsFileInsertionEvent.getCreationTime()),
+ 1.0),
+ socket,
+ pipeTsFileInsertionEvent.getTsFile(),
+ pipeTsFileInsertionEvent.isWithMod() && supportModsIfIsDataNodeReceiver
+ ? pipeTsFileInsertionEvent.getModFile()
+ : null,
+ pipeTsFileInsertionEvent.isTableModelEvent()
+ ? pipeTsFileInsertionEvent.getTableModelDatabaseName()
+ : null,
+ pipeTsFileInsertionEvent.toString());
+ }
+
+ private void doTransfer(
+ final Map<Pair<String, Long>, Double> pipe2WeightMap,
+ final AirGapSocket socket,
+ final File tsFile,
+ final File modFile,
+ final String dataBaseName,
+ final String receiverStatusContext)
+ throws PipeException, IOException {
final String errorMessage = String.format("Seal file %s error. Socket
%s.", tsFile, socket);
- // 1. Transfer file piece by piece, and mod if needed
- if (pipeTsFileInsertionEvent.isWithMod() &&
supportModsIfIsDataNodeReceiver) {
- final File modFile = pipeTsFileInsertionEvent.getModFile();
- transferFilePieces(pipeName, creationTime, modFile, socket, true);
- transferFilePieces(pipeName, creationTime, tsFile, socket, true);
- // 2. Transfer file seal signal with mod, which means the file is
transferred completely
- if (!send(
- pipeName,
- creationTime,
+ if (Objects.nonNull(modFile)) {
+ transferFilePieces(pipe2WeightMap, modFile, socket, true);
+ transferFilePieces(pipe2WeightMap, tsFile, socket, true);
+ if (!sendWeighted(
socket,
PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
- modFile.getName(),
- modFile.length(),
- tsFile.getName(),
- tsFile.length(),
- pipeTsFileInsertionEvent.isTableModelEvent()
- ? pipeTsFileInsertionEvent.getTableModelDatabaseName()
- : null))) {
+ modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length(), dataBaseName),
+ pipe2WeightMap)) {
receiverStatusHandler.handle(
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
- pipeTsFileInsertionEvent.toString());
+ receiverStatusContext);
} else {
LOGGER.info("Successfully transferred file {}.", tsFile);
}
} else {
- transferFilePieces(pipeName, creationTime, tsFile, socket, false);
- // 2. Transfer file seal signal without mod, which means the file is
transferred completely
- if (!send(
- pipeName,
- creationTime,
+ transferFilePieces(pipe2WeightMap, tsFile, socket, false);
+ if (!sendWeighted(
socket,
PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
- tsFile.getName(),
- tsFile.length(),
- pipeTsFileInsertionEvent.isTableModelEvent()
- ? pipeTsFileInsertionEvent.getTableModelDatabaseName()
- : null))) {
+ tsFile.getName(), tsFile.length(), dataBaseName),
+ pipe2WeightMap)) {
receiverStatusHandler.handle(
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
- pipeTsFileInsertionEvent.toString());
+ receiverStatusContext);
} else {
LOGGER.info("Successfully transferred file {}.", tsFile);
}
}
}
+ private void transferFilePieces(
+ final Map<Pair<String, Long>, Double> pipe2WeightMap,
+ final File file,
+ final AirGapSocket socket,
+ final boolean isMultiFile)
+ throws PipeException, IOException {
+ final int readFileBufferSize = PIPE_CONFIG.getPipeSinkReadFileBufferSize();
+ final byte[] readBuffer = new byte[readFileBufferSize];
+ long position = 0;
+ try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
+ while (true) {
+ mayLimitRateAndRecordIO(readFileBufferSize);
+ final int readLength = reader.read(readBuffer);
+ if (readLength == -1) {
+ break;
+ }
+
+ final byte[] payload =
+ readLength == readFileBufferSize
+ ? readBuffer
+ : Arrays.copyOfRange(readBuffer, 0, readLength);
+ if (!sendWeighted(
+ socket,
+ isMultiFile
+ ? getTransferMultiFilePieceBytes(file.getName(), position,
payload)
+ : getTransferSingleFilePieceBytes(file.getName(), position,
payload),
+ pipe2WeightMap)) {
+ final String errorMessage =
+ String.format("Transfer file %s error. Socket %s.", file,
socket);
+ receiverStatusHandler.handle(
+ new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(errorMessage),
+ errorMessage,
+ file.toString());
+ } else {
+ position += readLength;
+ }
+ }
+ }
+ }
+
+ private boolean sendBatch(
+ final AirGapSocket socket,
+ byte[] bytes,
+ final Map<Pair<String, Long>, Long> pipe2BytesAccumulated)
+ throws IOException {
+ final long uncompressedSize = bytes.length;
+ bytes = compressIfNeeded(bytes);
+
+ final double compressionRatio =
+ uncompressedSize == 0 ? 1 : (double) bytes.length / uncompressedSize;
+ for (final Map.Entry<Pair<String, Long>, Long> entry :
pipe2BytesAccumulated.entrySet()) {
+ rateLimitIfNeeded(
+ entry.getKey().getLeft(),
+ entry.getKey().getRight(),
+ socket.getEndPoint(),
+ (long) (entry.getValue() * compressionRatio));
+ }
+ return sendBytes(socket, bytes);
+ }
+
+ private boolean sendWeighted(
+ final AirGapSocket socket,
+ byte[] bytes,
+ final Map<Pair<String, Long>, Double> pipe2WeightMap)
+ throws IOException {
+ bytes = compressIfNeeded(bytes);
+
+ for (final Map.Entry<Pair<String, Long>, Double> entry :
pipe2WeightMap.entrySet()) {
+ rateLimitIfNeeded(
+ entry.getKey().getLeft(),
+ entry.getKey().getRight(),
+ socket.getEndPoint(),
+ (long) (bytes.length * entry.getValue()));
+ }
+ return sendBytes(socket, bytes);
+ }
+
+ private byte[] toTPipeTransferBytes(final TPipeTransferReq req) throws
IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(req.version, outputStream);
+ ReadWriteIOUtils.write(req.type, outputStream);
+
+ final ByteBuffer bodyBuffer = req.body.duplicate();
+ final byte[] body = new byte[bodyBuffer.remaining()];
+ bodyBuffer.get(body);
+ outputStream.write(body);
+
+ return byteArrayOutputStream.toByteArray();
+ }
+ }
+
@Override
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes);
@@ -402,4 +607,60 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
}
return super.compressIfNeeded(reqInBytes);
}
+
+ @Override
+ public synchronized void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ if (Objects.nonNull(tabletBatchBuilder)) {
+ tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId);
+ }
+ }
+
+ public int getBatchSize() {
+ return Objects.nonNull(tabletBatchBuilder) ? tabletBatchBuilder.size() : 0;
+ }
+
+ @Override
+ public void close() {
+ if (tabletBatchBuilder != null) {
+ tabletBatchBuilder.close();
+ }
+
+ super.close();
+ }
+
+ @Override
+ public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+ if (tabletBatchBuilder != null) {
+ tabletBatchBuilder.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+ }
+ }
+
+ @Override
+ public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+ if (tabletBatchBuilder != null) {
+ tabletBatchBuilder.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+ }
+ }
+
+ @Override
+ public void setTabletBatchTimeIntervalHistogram(Histogram
tabletBatchTimeIntervalHistogram) {
+ if (tabletBatchBuilder != null) {
+
tabletBatchBuilder.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+ }
+ }
+
+ @Override
+ public void setTsFileBatchTimeIntervalHistogram(Histogram
tsFileBatchTimeIntervalHistogram) {
+ if (tabletBatchBuilder != null) {
+
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+ }
+ }
+
+ @Override
+ public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) {
+ if (tabletBatchBuilder != null) {
+ tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java
new file mode 100644
index 00000000000..a2d5de56ca5
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.airgap;
+
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
+import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class IoTDBDataRegionAirGapSinkTest {
+
+ @Test
+ public void testTransferTabletBatchOverAirGap() throws Exception {
+ try (final RecordingIoTDBDataRegionAirGapSink sink = new
RecordingIoTDBDataRegionAirGapSink()) {
+ final PipeParameters parameters = buildParameters(false);
+ sink.validate(new PipeParameterValidator(parameters));
+ sink.customize(
+ parameters,
+ new PipeTaskRuntimeConfiguration(new
PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1)));
+ sink.prepareSocket();
+
+ sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 1L));
+ sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 2L));
+
+ Thread.sleep(300L);
+ sink.transfer(new PipeHeartbeatEvent(-1, false));
+
+ Assert.assertEquals(1, sink.sentRequests.size());
+
+ final TPipeTransferReq req =
toTPipeTransferReq(sink.sentRequests.get(0));
+ Assert.assertEquals(PipeRequestType.TRANSFER_TABLET_BATCH_V2.getType(),
req.type);
+
+ final PipeTransferTabletBatchReqV2 batchReq =
+ PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req);
+ Assert.assertEquals(2, batchReq.getTabletReqs().size());
+ }
+ }
+
+ @Test
+ public void testTransferTsFileBatchOverAirGap() throws Exception {
+ try (final RecordingIoTDBDataRegionAirGapSink sink = new
RecordingIoTDBDataRegionAirGapSink()) {
+ final PipeParameters parameters = buildParameters(true);
+ sink.validate(new PipeParameterValidator(parameters));
+ sink.customize(
+ parameters,
+ new PipeTaskRuntimeConfiguration(new
PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1)));
+ sink.prepareSocket();
+
+ sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 1L));
+ sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 2L));
+
+ Thread.sleep(300L);
+ sink.transfer(new PipeHeartbeatEvent(-1, false));
+
+ final List<Short> requestTypes = new ArrayList<>();
+ for (final byte[] requestBytes : sink.sentRequests) {
+ requestTypes.add(toTPipeTransferReq(requestBytes).type);
+ }
+
+
Assert.assertTrue(requestTypes.contains(PipeRequestType.TRANSFER_TS_FILE_PIECE.getType()));
+ Assert.assertTrue(
+
requestTypes.contains(PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD.getType()));
+
Assert.assertFalse(requestTypes.contains(PipeRequestType.TRANSFER_TABLET_RAW_V2.getType()));
+ Assert.assertFalse(
+
requestTypes.contains(PipeRequestType.TRANSFER_TABLET_BATCH_V2.getType()));
+ }
+ }
+
+ private PipeParameters buildParameters(final boolean useTsFileBatch) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(PipeSinkConstant.CONNECTOR_KEY,
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
+ attributes.put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY,
"127.0.0.1:6668");
+ attributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, "200");
+ attributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1048576");
+ if (useTsFileBatch) {
+ attributes.put(PipeSinkConstant.CONNECTOR_FORMAT_KEY, "tsfile");
+ }
+ return new PipeParameters(attributes);
+ }
+
+ private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent(
+ final String pipeName, final long creationTime, final long value) {
+ final List<IMeasurementSchema> schemaList =
+ Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64));
+ final Tablet tablet = new Tablet("root.db.d1", schemaList, 1);
+ tablet.addTimestamp(0, value);
+ tablet.addValue("s1", 0, value);
+ return new PipeRawTabletInsertionEvent(
+ false,
+ "root.db",
+ "db",
+ "root.db",
+ tablet,
+ false,
+ pipeName,
+ creationTime,
+ null,
+ null,
+ false);
+ }
+
+ private static TPipeTransferReq toTPipeTransferReq(final byte[]
requestBytes) {
+ final ByteBuffer buffer = ByteBuffer.wrap(requestBytes);
+
+ final TPipeTransferReq req = new TPipeTransferReq();
+ req.version = ReadWriteIOUtils.readByte(buffer);
+ req.type = ReadWriteIOUtils.readShort(buffer);
+ req.body = buffer.slice();
+ return req;
+ }
+
+ private static class RecordingIoTDBDataRegionAirGapSink extends
IoTDBDataRegionAirGapSink {
+
+ private final List<byte[]> sentRequests = new ArrayList<>();
+
+ private void prepareSocket() {
+ sockets.set(0, new TestingAirGapSocket());
+ }
+
+ @Override
+ protected int nextSocketIndex() {
+ return 0;
+ }
+
+ @Override
+ protected boolean sendBytes(final AirGapSocket socket, final byte[] bytes)
{
+ sentRequests.add(Arrays.copyOf(bytes, bytes.length));
+ return true;
+ }
+
+ private static class TestingAirGapSocket extends AirGapSocket {
+
+ private TestingAirGapSocket() {
+ super("127.0.0.1", 6668);
+ }
+
+ @Override
+ public synchronized void setSoTimeout(final int timeout) {
+ // No-op for unit test.
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index 7d84e3bb98f..01b15d80432 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -109,13 +109,6 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
throws Exception {
super.customize(parameters, configuration);
- if (isTabletBatchModeEnabled) {
- LOGGER.warn(
- "Batch mode is enabled by the given parameters. "
- + "IoTDBAirGapConnector does not support batch mode. "
- + "Disable batch mode.");
- }
-
for (int i = 0; i < nodeUrls.size(); i++) {
isSocketAlive.add(false);
sockets.add(null);
@@ -327,15 +320,17 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
protected boolean send(
final String pipeName, final long creationTime, final AirGapSocket
socket, byte[] bytes)
throws IOException {
+ bytes = compressIfNeeded(bytes);
+ rateLimitIfNeeded(pipeName, creationTime, socket.getEndPoint(),
bytes.length);
+ return sendBytes(socket, bytes);
+ }
+
+ protected boolean sendBytes(final AirGapSocket socket, byte[] bytes) throws
IOException {
if (!socket.isConnected()) {
throw new SocketException(
String.format("Socket %s is closed, will try to handshake", socket));
}
- bytes = compressIfNeeded(bytes);
-
- rateLimitIfNeeded(pipeName, creationTime, socket.getEndPoint(),
bytes.length);
-
final BufferedOutputStream outputStream = new
BufferedOutputStream(socket.getOutputStream());
bytes = enrichWithLengthAndChecksum(bytes);
outputStream.write(eLanguageEnable ? enrichWithELanguage(bytes) : bytes);