This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch air-gap-udp-transport
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3144e61c0ff37193181969a04737c330c44b0e71
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 15:44:06 2026 +0800

    Implement UDP transport for air-gap sink
---
 .../protocol/airgap/IoTDBAirGapReceiver.java       | 131 ++++++++---
 .../protocol/airgap/IoTDBAirGapReceiverAgent.java  | 106 ++++++++-
 .../thrift/IoTDBDataNodeReceiverAgent.java         |  18 +-
 .../protocol/airgap/IoTDBAirGapReceiverTest.java   |  37 ++++
 .../airgap/IoTDBDataRegionAirGapSinkTest.java      | 107 ++++++++-
 .../pipe/config/constant/PipeSinkConstant.java     |  20 ++
 .../pipe/sink/protocol/IoTDBAirGapSink.java        | 240 ++++++++++++++++++++-
 7 files changed, 618 insertions(+), 41 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 825ff4c5eac..ff22ed8d9e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -40,10 +40,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
 import java.net.Socket;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.zip.CRC32;
@@ -61,6 +65,9 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
   private final IoTDBDataNodeReceiverAgent agent;
 
   private boolean isELanguagePayload;
+  private OutputStream currentOutputStream;
+  private DatagramSocket currentDatagramSocket;
+  private SocketAddress currentRemoteSocketAddress;
 
   public IoTDBAirGapReceiver(final Socket socket, final long receiverId) {
     this.socket = socket;
@@ -101,33 +108,11 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
 
     try {
       final byte[] data = readData(inputStream);
+      currentOutputStream = socket.getOutputStream();
 
-      // If check sum failed, it indicates that the length we read may not be 
correct.
-      // Namely, there may be remaining bytes in the socket stream, which will 
fail any subsequent
-      // attempts to read from that.
-      // We directly close the socket here.
-      if (!checkSum(data)) {
-        LOGGER.warn(
-            DataNodePipeMessages.PIPE_AIR_GAP_RECEIVER_CLOSED_BECAUSE_OF, 
receiverId, socket);
-        try {
-          fail();
-        } finally {
-          socket.close();
-        }
-        return;
+      if (!receive(data, null)) {
+        socket.close();
       }
-
-      // Removed the used checksum
-      final ByteBuffer byteBuffer = ByteBuffer.wrap(data, LONG_LEN, 
data.length - LONG_LEN);
-
-      // Pseudo request, to reuse logic in IoTDBThriftReceiverAgent
-      final AirGapPseudoTPipeTransferRequest req =
-          (AirGapPseudoTPipeTransferRequest)
-              new AirGapPseudoTPipeTransferRequest()
-                  .setVersion(ReadWriteIOUtils.readByte(byteBuffer))
-                  .setType(ReadWriteIOUtils.readShort(byteBuffer))
-                  .setBody(byteBuffer.slice());
-      handleReq(req, System.currentTimeMillis());
     } catch (final PipeConnectionException e) {
       LOGGER.info(
           DataNodePipeMessages.PIPE_AIR_GAP_RECEIVER_SOCKET_CLOSED_WHEN,
@@ -145,9 +130,82 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
     }
   }
 
+  boolean receive(final byte[] data, final String receiverKey) throws 
IOException {
+    try {
+      // If check sum failed, it indicates that the length we read may not be 
correct.
+      // Namely, there may be remaining bytes in the socket stream, which will 
fail any subsequent
+      // attempts to read from that.
+      if (!checkSum(data)) {
+        LOGGER.warn(
+            DataNodePipeMessages.PIPE_AIR_GAP_RECEIVER_CLOSED_BECAUSE_OF, 
receiverId, socket);
+        fail();
+        return false;
+      }
+
+      handleReq(toTPipeTransferReq(data), System.currentTimeMillis(), 
receiverKey);
+      return true;
+    } catch (final IOException e) {
+      throw e;
+    } catch (final Exception e) {
+      if (currentDatagramSocket != null) {
+        fail();
+      }
+      throw e;
+    } finally {
+      currentOutputStream = null;
+      currentDatagramSocket = null;
+      currentRemoteSocketAddress = null;
+    }
+  }
+
+  void receiveUdp(
+      final DatagramSocket datagramSocket,
+      final DatagramPacket packet,
+      final String receiverKey,
+      final byte[] buffer)
+      throws IOException {
+    isELanguagePayload = false;
+    currentDatagramSocket = datagramSocket;
+    currentRemoteSocketAddress = packet.getSocketAddress();
+    boolean requestHandlingStarted = false;
+    try {
+      final byte[] data = readData(new ByteArrayInputStream(buffer, 0, 
packet.getLength()));
+      requestHandlingStarted = true;
+      receive(data, receiverKey);
+    } catch (final Exception e) {
+      if (!requestHandlingStarted) {
+        fail();
+      }
+      throw e;
+    } finally {
+      currentOutputStream = null;
+      currentDatagramSocket = null;
+      currentRemoteSocketAddress = null;
+    }
+  }
+
+  private AirGapPseudoTPipeTransferRequest toTPipeTransferReq(final byte[] 
data) {
+    // Removed the used checksum
+    final ByteBuffer byteBuffer = ByteBuffer.wrap(data, LONG_LEN, data.length 
- LONG_LEN);
+
+    // Pseudo request, to reuse logic in IoTDBThriftReceiverAgent
+    return (AirGapPseudoTPipeTransferRequest)
+        new AirGapPseudoTPipeTransferRequest()
+            .setVersion(ReadWriteIOUtils.readByte(byteBuffer))
+            .setType(ReadWriteIOUtils.readShort(byteBuffer))
+            .setBody(byteBuffer.slice());
+  }
+
   private void handleReq(final AirGapPseudoTPipeTransferRequest req, final 
long startTime)
       throws IOException {
-    final TPipeTransferResp resp = agent.receive(req);
+    handleReq(req, startTime, null);
+  }
+
+  private void handleReq(
+      final AirGapPseudoTPipeTransferRequest req, final long startTime, final 
String receiverKey)
+      throws IOException {
+    final TPipeTransferResp resp =
+        receiverKey == null ? agent.receive(req) : agent.receive(receiverKey, 
req);
 
     final TSStatus status = resp.getStatus();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -170,7 +228,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
       
LOGGER.info(DataNodePipeMessages.TEMPORARY_UNAVAILABLE_EXCEPTION_ENCOUNTERED_AT_AIR_GAP);
       if (System.currentTimeMillis() - startTime
           < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
-        handleReq(req, startTime);
+        handleReq(req, startTime, receiverKey);
       } else {
         LOGGER.warn(
             
DataNodePipeMessages.PIPE_AIR_GAP_RECEIVER_TEMPORARY_UNAVAILABLE_RETRY, 
receiverId);
@@ -187,14 +245,23 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
   }
 
   private void ok() throws IOException {
-    final OutputStream outputStream = socket.getOutputStream();
-    outputStream.write(AirGapOneByteResponse.OK);
-    outputStream.flush();
+    respond(AirGapOneByteResponse.OK);
   }
 
   private void fail() throws IOException {
-    final OutputStream outputStream = socket.getOutputStream();
-    outputStream.write(AirGapOneByteResponse.FAIL);
+    respond(AirGapOneByteResponse.FAIL);
+  }
+
+  private void respond(final byte[] response) throws IOException {
+    if (currentDatagramSocket != null && currentRemoteSocketAddress != null) {
+      currentDatagramSocket.send(
+          new DatagramPacket(response, response.length, 
currentRemoteSocketAddress));
+      return;
+    }
+
+    final OutputStream outputStream =
+        currentOutputStream != null ? currentOutputStream : 
socket.getOutputStream();
+    outputStream.write(response);
     outputStream.flush();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
index c14ce2054b7..0e8c642ce47 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
@@ -26,14 +26,22 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.protocol.session.ClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -41,15 +49,21 @@ import java.util.concurrent.atomic.AtomicLong;
 public class IoTDBAirGapReceiverAgent implements IService {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBAirGapReceiverAgent.class);
+  private static final int UDP_PACKET_MAX_SIZE_IN_BYTES = 65_507;
 
   private final ExecutorService listenExecutor =
       IoTDBThreadPoolFactory.newSingleThreadExecutor(
           ThreadName.PIPE_RECEIVER_AIR_GAP_AGENT.getName());
+  private final ExecutorService udpListenExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
+          ThreadName.PIPE_RECEIVER_AIR_GAP_AGENT.getName() + "-UDP");
   private final AtomicBoolean allowSubmitListen = new AtomicBoolean(false);
 
   private ServerSocket serverSocket;
+  private DatagramSocket datagramSocket;
 
   private final AtomicLong receiverId = new AtomicLong(0);
+  private final Map<String, ClientSession> udpClientSessions = new 
ConcurrentHashMap<>();
 
   public void listen() {
     try {
@@ -61,7 +75,9 @@ public class IoTDBAirGapReceiverAgent implements IService {
           ThreadName.PIPE_AIR_GAP_RECEIVER.getName() + "-" + airGapReceiverId);
       airGapReceiverThread.start();
     } catch (final IOException e) {
-      
LOGGER.warn(DataNodePipeMessages.UNHANDLED_EXCEPTION_DURING_PIPE_AIR_GAP_RECEIVER,
 e);
+      if (allowSubmitListen.get()) {
+        
LOGGER.warn(DataNodePipeMessages.UNHANDLED_EXCEPTION_DURING_PIPE_AIR_GAP_RECEIVER,
 e);
+      }
     }
 
     if (allowSubmitListen.get()) {
@@ -69,32 +85,97 @@ public class IoTDBAirGapReceiverAgent implements IService {
     }
   }
 
+  public void listenUdp() {
+    while (allowSubmitListen.get() && !datagramSocket.isClosed()) {
+      final byte[] buffer = new byte[UDP_PACKET_MAX_SIZE_IN_BYTES];
+      final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+      try {
+        datagramSocket.receive(packet);
+
+        final long airGapReceiverId = receiverId.incrementAndGet();
+        final IoTDBAirGapReceiver receiver =
+            new IoTDBAirGapReceiver(new Socket(), airGapReceiverId);
+        final String receiverKey = packet.getSocketAddress().toString();
+        final boolean registeredSession = 
registerUdpSessionIfNecessary(packet);
+        try {
+          receiver.receiveUdp(datagramSocket, packet, receiverKey, buffer);
+        } finally {
+          if (registeredSession) {
+            SessionManager.getInstance().removeCurrSession();
+          }
+        }
+      } catch (final IOException e) {
+        if (allowSubmitListen.get()) {
+          
LOGGER.warn(DataNodePipeMessages.UNHANDLED_EXCEPTION_DURING_PIPE_AIR_GAP_RECEIVER,
 e);
+        }
+      } catch (final Exception e) {
+        
LOGGER.warn(DataNodePipeMessages.UNHANDLED_EXCEPTION_DURING_PIPE_AIR_GAP_RECEIVER,
 e);
+      }
+    }
+  }
+
+  private boolean registerUdpSessionIfNecessary(final DatagramPacket packet) {
+    final String receiverKey = packet.getSocketAddress().toString();
+    final ClientSession session =
+        udpClientSessions.computeIfAbsent(
+            receiverKey,
+            key ->
+                new ClientSession(new 
DatagramClientSocket(packet.getAddress(), packet.getPort())));
+    return SessionManager.getInstance().registerSession(session);
+  }
+
   @Override
   public void start() throws StartupException {
     try {
       serverSocket = new 
ServerSocket(PipeConfig.getInstance().getPipeAirGapReceiverPort());
+      datagramSocket = new 
DatagramSocket(PipeConfig.getInstance().getPipeAirGapReceiverPort());
     } catch (final IOException e) {
+      if (Objects.nonNull(serverSocket)) {
+        try {
+          serverSocket.close();
+        } catch (final IOException closeException) {
+          e.addSuppressed(closeException);
+        }
+      }
       throw new StartupException(e);
     }
 
     allowSubmitListen.set(true);
     listenExecutor.submit(this::listen);
+    udpListenExecutor.submit(this::listenUdp);
 
     LOGGER.info(DataNodePipeMessages.IOTDBAIRGAPRECEIVERAGENT_STARTED, 
serverSocket);
   }
 
   @Override
   public void stop() {
+    allowSubmitListen.set(false);
+
     try {
       if (Objects.nonNull(serverSocket)) {
         serverSocket.close();
       }
+      if (Objects.nonNull(datagramSocket)) {
+        datagramSocket.close();
+      }
     } catch (final IOException e) {
       
LOGGER.warn(DataNodePipeMessages.FAILED_TO_CLOSE_IOTDBAIRGAPRECEIVERAGENT_S_SERVER_SOCKET,
 e);
     }
 
-    allowSubmitListen.set(false);
+    udpClientSessions.forEach(
+        (key, session) -> {
+          final boolean registeredSession = 
SessionManager.getInstance().registerSession(session);
+          try {
+            PipeDataNodeAgent.receiver().thrift().handleClientExit(key);
+          } finally {
+            if (registeredSession) {
+              SessionManager.getInstance().removeCurrSession();
+            }
+          }
+        });
+    udpClientSessions.clear();
     listenExecutor.shutdown();
+    udpListenExecutor.shutdown();
 
     LOGGER.info(DataNodePipeMessages.IOTDBAIRGAPRECEIVERAGENT_STOPPED, 
serverSocket);
   }
@@ -103,4 +184,25 @@ public class IoTDBAirGapReceiverAgent implements IService {
   public ServiceType getID() {
     return ServiceType.AIR_GAP_SERVICE;
   }
+
+  private static class DatagramClientSocket extends Socket {
+
+    private final InetAddress address;
+    private final int port;
+
+    private DatagramClientSocket(final InetAddress address, final int port) {
+      this.address = address;
+      this.port = port;
+    }
+
+    @Override
+    public InetAddress getInetAddress() {
+      return address;
+    }
+
+    @Override
+    public int getPort() {
+      return port;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
index 8a995f7656d..d761bb711fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
@@ -24,9 +24,13 @@ import 
org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver.TwoStageAggregateReceiver;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 public class IoTDBDataNodeReceiverAgent extends IoTDBReceiverAgent {
 
   private final ThreadLocal<IoTDBReceiver> receiverThreadLocal = new 
ThreadLocal<>();
+  private final Map<String, IoTDBReceiver> receiverMap = new 
ConcurrentHashMap<>();
 
   @Override
   protected void initConstructors() {
@@ -38,16 +42,24 @@ public class IoTDBDataNodeReceiverAgent extends 
IoTDBReceiverAgent {
 
   @Override
   protected IoTDBReceiver getReceiverWithSpecifiedClient(final String ignore) {
-    return receiverThreadLocal.get();
+    return ignore == null ? receiverThreadLocal.get() : 
receiverMap.get(ignore);
   }
 
   @Override
   protected void setReceiverWithSpecifiedClient(final String ignore, final 
IoTDBReceiver receiver) {
-    receiverThreadLocal.set(receiver);
+    if (ignore == null) {
+      receiverThreadLocal.set(receiver);
+    } else {
+      receiverMap.put(ignore, receiver);
+    }
   }
 
   @Override
   protected void removeReceiverWithSpecifiedClient(final String ignore) {
-    receiverThreadLocal.remove();
+    if (ignore == null) {
+      receiverThreadLocal.remove();
+    } else {
+      receiverMap.remove(ignore);
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
index e23db1f1ca8..9bf59f0a508 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
@@ -33,11 +33,13 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -45,6 +47,8 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.CRC32;
 
 public class IoTDBAirGapReceiverTest {
 
@@ -121,6 +125,19 @@ public class IoTDBAirGapReceiverTest {
     }
   }
 
+  @Test
+  public void testReceiveAirGapPayloadWithSpecifiedReceiverKey() throws 
Exception {
+    final RecordingSocket socket = new RecordingSocket();
+    final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(socket, 4L);
+    final StubIoTDBDataNodeReceiverAgent stubAgent = new 
StubIoTDBDataNodeReceiverAgent();
+    stubAgent.setStubReceiver(
+        "udp-client", new StubReceiver(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+    setField(receiver, "agent", stubAgent);
+
+    Assert.assertTrue(receiver.receive(toAirGapData(toRawRequestBytes()), 
"udp-client"));
+    Assert.assertArrayEquals(AirGapOneByteResponse.OK, 
socket.getWrittenBytes());
+  }
+
   private static void setField(final Object target, final String fieldName, 
final Object value)
       throws Exception {
     final Field field = IoTDBAirGapReceiver.class.getDeclaredField(fieldName);
@@ -147,6 +164,10 @@ public class IoTDBAirGapReceiverTest {
     void setStubReceiver(final IoTDBReceiver receiver) {
       setReceiverWithSpecifiedClient(null, receiver);
     }
+
+    void setStubReceiver(final String key, final IoTDBReceiver receiver) {
+      setReceiverWithSpecifiedClient(key, receiver);
+    }
   }
 
   private static class StubReceiver implements IoTDBReceiver {
@@ -172,4 +193,20 @@ public class IoTDBAirGapReceiverTest {
       return IoTDBSinkRequestVersion.VERSION_1;
     }
   }
+
+  private static byte[] toRawRequestBytes() throws IOException {
+    try (final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 
outputStream);
+      ReadWriteIOUtils.write((short) 0, outputStream);
+      return byteArrayOutputStream.toByteArray();
+    }
+  }
+
+  private static byte[] toAirGapData(final byte[] rawRequestBytes) {
+    final CRC32 crc32 = new CRC32();
+    crc32.update(rawRequestBytes, 0, rawRequestBytes.length);
+    return BytesUtils.concatByteArrayList(
+        Arrays.asList(BytesUtils.longToBytes(crc32.getValue()), 
rawRequestBytes));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java
index d49ef82ce05..137d8b40dc8 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapOneByteResponse;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -39,12 +40,18 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.CRC32;
 
 public class IoTDBDataRegionAirGapSinkTest {
 
@@ -104,7 +111,79 @@ public class IoTDBDataRegionAirGapSinkTest {
     }
   }
 
+  @Test
+  public void testUdpAirGapSlicing() throws Exception {
+    try (final DatagramSocket receiverSocket = new DatagramSocket(0);
+        final UdpTestingIoTDBDataRegionAirGapSink sink =
+            new UdpTestingIoTDBDataRegionAirGapSink()) {
+      final Map<String, String> attributes = buildParameterAttributes(false);
+      attributes.put(PipeSinkConstant.CONNECTOR_AIR_GAP_TRANSPORT_KEY, "udp");
+      attributes.put(PipeSinkConstant.CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_KEY, 
"1024");
+
+      final PipeParameters parameters = new PipeParameters(attributes);
+      sink.validate(new PipeParameterValidator(parameters));
+      sink.customize(
+          parameters,
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1)));
+
+      final List<TPipeTransferReq> receivedRequests = new ArrayList<>();
+      final AtomicBoolean senderFinished = new AtomicBoolean(false);
+      final AtomicReference<Throwable> receiverException = new 
AtomicReference<>();
+      final Thread receiverThread =
+          new Thread(
+              () -> {
+                try {
+                  receiverSocket.setSoTimeout(100);
+                  while (!senderFinished.get()) {
+                    final byte[] buffer = new byte[2048];
+                    final DatagramPacket packet = new DatagramPacket(buffer, 
buffer.length);
+                    try {
+                      receiverSocket.receive(packet);
+                      receivedRequests.add(decodeAirGapDatagram(buffer, 
packet.getLength()));
+                      receiverSocket.send(
+                          new DatagramPacket(
+                              AirGapOneByteResponse.OK,
+                              AirGapOneByteResponse.OK.length,
+                              packet.getAddress(),
+                              packet.getPort()));
+                    } catch (final SocketTimeoutException ignored) {
+                      // Poll until the sender has finished all datagrams.
+                    }
+                  }
+                } catch (final Throwable ignored) {
+                  receiverException.set(ignored);
+                }
+              });
+      receiverThread.start();
+
+      final byte[] request = new byte[2500];
+      request[0] = 1;
+      final byte[] type = 
org.apache.tsfile.utils.BytesUtils.shortToBytes((short) 11);
+      request[1] = type[0];
+      request[2] = type[1];
+
+      try {
+        Assert.assertTrue(sink.sendUdp(receiverSocket.getLocalPort(), 
request));
+      } finally {
+        senderFinished.set(true);
+      }
+
+      receiverThread.join(5000);
+      Assert.assertFalse(receiverThread.isAlive());
+      Assert.assertNull(receiverException.get());
+      Assert.assertTrue(receivedRequests.size() > 1);
+      for (final TPipeTransferReq receivedRequest : receivedRequests) {
+        Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), 
receivedRequest.type);
+        Assert.assertTrue(receivedRequest.body.remaining() < 1024);
+      }
+    }
+  }
+
   private PipeParameters buildParameters(final boolean useTsFileBatch) {
+    return new PipeParameters(buildParameterAttributes(useTsFileBatch));
+  }
+
+  private Map<String, String> buildParameterAttributes(final boolean 
useTsFileBatch) {
     final Map<String, String> attributes = new HashMap<>();
     attributes.put(
         PipeSinkConstant.CONNECTOR_KEY,
@@ -115,7 +194,7 @@ public class IoTDBDataRegionAirGapSinkTest {
     if (useTsFileBatch) {
       attributes.put(PipeSinkConstant.CONNECTOR_FORMAT_KEY, "tsfile");
     }
-    return new PipeParameters(attributes);
+    return attributes;
   }
 
   private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent(
@@ -149,6 +228,22 @@ public class IoTDBDataRegionAirGapSinkTest {
     return req;
   }
 
+  private static TPipeTransferReq decodeAirGapDatagram(final byte[] datagram, 
final int length) {
+    final ByteBuffer buffer = ByteBuffer.wrap(datagram, 0, length);
+    final int payloadLength = ReadWriteIOUtils.readInt(buffer);
+    Assert.assertEquals(payloadLength, ReadWriteIOUtils.readInt(buffer));
+    Assert.assertEquals(length - 2 * Integer.BYTES, payloadLength);
+
+    final long expectedChecksum = ReadWriteIOUtils.readLong(buffer);
+    final byte[] payload = new byte[payloadLength - Long.BYTES];
+    buffer.get(payload);
+
+    final CRC32 crc32 = new CRC32();
+    crc32.update(payload, 0, payload.length);
+    Assert.assertEquals(expectedChecksum, crc32.getValue());
+    return toTPipeTransferReq(payload);
+  }
+
   private static class RecordingIoTDBDataRegionAirGapSink extends 
IoTDBDataRegionAirGapSink {
 
     private final List<byte[]> sentRequests = new ArrayList<>();
@@ -180,4 +275,14 @@ public class IoTDBDataRegionAirGapSinkTest {
       }
     }
   }
+
+  private static class UdpTestingIoTDBDataRegionAirGapSink extends 
IoTDBDataRegionAirGapSink {
+
+    private boolean sendUdp(final int port, final byte[] request) throws 
Exception {
+      try (final AirGapSocket socket = new AirGapSocket("127.0.0.1", port)) {
+        socket.connectUdp(1000);
+        return sendBytes(socket, request);
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index 058b17e2f4f..98fedbbeb35 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -152,6 +152,26 @@ public class PipeSinkConstant {
       "sink.air-gap.handshake-timeout-ms";
   public static final int CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE 
= 5000;
 
+  public static final String CONNECTOR_AIR_GAP_TRANSPORT_KEY = 
"connector.air-gap.transport";
+  public static final String SINK_AIR_GAP_TRANSPORT_KEY = 
"sink.air-gap.transport";
+  public static final String CONNECTOR_AIR_GAP_TRANSPORT_TCP_VALUE = "tcp";
+  public static final String CONNECTOR_AIR_GAP_TRANSPORT_UDP_VALUE = "udp";
+  public static final String CONNECTOR_AIR_GAP_TRANSPORT_DEFAULT_VALUE =
+      CONNECTOR_AIR_GAP_TRANSPORT_TCP_VALUE;
+  public static final Set<String> CONNECTOR_AIR_GAP_TRANSPORT_SET =
+      Collections.unmodifiableSet(
+          new HashSet<>(
+              Arrays.asList(
+                  CONNECTOR_AIR_GAP_TRANSPORT_TCP_VALUE, 
CONNECTOR_AIR_GAP_TRANSPORT_UDP_VALUE)));
+
+  public static final String CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_KEY =
+      "connector.air-gap.udp.packet-size-bytes";
+  public static final String SINK_AIR_GAP_UDP_PACKET_SIZE_KEY =
+      "sink.air-gap.udp.packet-size-bytes";
+  public static final int CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_DEFAULT_VALUE = 60 
* 1024;
+  public static final int CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MIN_VALUE = 1024;
+  public static final int CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MAX_VALUE = 65_507;
+
   public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = 
"connector.version";
   public static final String SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY = 
"sink.version";
   public static final String 
CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index 6d74102ff19..6099109b781 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -25,25 +25,35 @@ import org.apache.iotdb.commons.i18n.PipeMessages;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant;
 import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapOneByteResponse;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -55,20 +65,36 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_TRANSPORT_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_TRANSPORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_TRANSPORT_SET;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_TRANSPORT_UDP_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MAX_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MIN_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_AIR_GAP_TRANSPORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_AIR_GAP_UDP_PACKET_SIZE_KEY;
 import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
 
 @TreeModel
 @TableModel
 public abstract class IoTDBAirGapSink extends IoTDBSink {
 
+  private static final int UDP_ENVELOPE_SIZE = 2 * Integer.BYTES + Long.BYTES;
+  private static final int UDP_SLICE_REQUEST_SERIALIZATION_RESERVED_SIZE = 64;
+
   protected static class AirGapSocket extends Socket {
 
     private final TEndPoint endPoint;
+    private DatagramSocket datagramSocket;
+    private InetAddress datagramAddress;
+    private boolean isUdp;
 
     public AirGapSocket(final String ip, final int port) {
       this.endPoint = new TEndPoint(ip, port);
@@ -78,9 +104,60 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
       return endPoint;
     }
 
+    public void connectUdp(final int timeoutMs) throws IOException {
+      datagramAddress = InetAddress.getByName(endPoint.getIp());
+      datagramSocket = new DatagramSocket();
+      datagramSocket.connect(datagramAddress, endPoint.getPort());
+      datagramSocket.setSoTimeout(timeoutMs);
+      isUdp = true;
+    }
+
+    public InetAddress getDatagramAddress() {
+      return datagramAddress;
+    }
+
+    public DatagramSocket getDatagramSocket() {
+      return datagramSocket;
+    }
+
+    public boolean isUdp() {
+      return isUdp;
+    }
+
+    @Override
+    public boolean isConnected() {
+      return isUdp
+          ? datagramSocket != null && datagramSocket.isConnected() && 
!datagramSocket.isClosed()
+          : super.isConnected();
+    }
+
+    @Override
+    public synchronized void setSoTimeout(final int timeout) throws 
SocketException {
+      if (isUdp && datagramSocket != null) {
+        datagramSocket.setSoTimeout(timeout);
+      } else {
+        super.setSoTimeout(timeout);
+      }
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+      if (datagramSocket != null) {
+        datagramSocket.close();
+      }
+      super.close();
+    }
+
     @Override
     public String toString() {
-      return "AirGapSocket{" + "endPoint=" + endPoint + "} (" + 
super.toString() + ")";
+      return "AirGapSocket{"
+          + "endPoint="
+          + endPoint
+          + ", transport="
+          + (isUdp ? "udp" : "tcp")
+          + "} ("
+          + super.toString()
+          + ")";
     }
   }
 
@@ -98,12 +175,49 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
   private int handshakeTimeoutMs;
 
   private boolean eLanguageEnable;
+  private boolean useUdpTransport;
+  private int udpPacketSizeInBytes;
 
   // The air gap connector does not use clientManager thus we put handshake 
type here
   protected boolean supportModsIfIsDataNodeReceiver = true;
 
   private final Map<TEndPoint, Long> failLogTimes = new HashMap<>();
 
+  @Override
+  public void validate(final PipeParameterValidator validator) throws 
Exception {
+    super.validate(validator);
+
+    final PipeParameters parameters = validator.getParameters();
+    final String airGapTransport =
+        parameters
+            .getStringOrDefault(
+                Arrays.asList(CONNECTOR_AIR_GAP_TRANSPORT_KEY, 
SINK_AIR_GAP_TRANSPORT_KEY),
+                CONNECTOR_AIR_GAP_TRANSPORT_DEFAULT_VALUE)
+            .trim()
+            .toLowerCase();
+    validator.validate(
+        arg -> CONNECTOR_AIR_GAP_TRANSPORT_SET.contains(airGapTransport),
+        String.format(
+            "Air gap transport should be one of %s, but got %s.",
+            CONNECTOR_AIR_GAP_TRANSPORT_SET, airGapTransport),
+        airGapTransport);
+
+    final int packetSize =
+        parameters.getIntOrDefault(
+            Arrays.asList(CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_KEY, 
SINK_AIR_GAP_UDP_PACKET_SIZE_KEY),
+            CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_DEFAULT_VALUE);
+    validator.validate(
+        arg ->
+            packetSize >= CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MIN_VALUE
+                && packetSize <= CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MAX_VALUE,
+        String.format(
+            "UDP packet size should be in the range [%d, %d], but got %d.",
+            CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MIN_VALUE,
+            CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MAX_VALUE,
+            packetSize),
+        packetSize);
+  }
+
   @Override
   public void customize(
       final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
@@ -145,6 +259,23 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
                 CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY, 
SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY),
             CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE);
     LOGGER.info(PipeMessages.AIR_GAP_CUSTOMIZED_E_LANGUAGE, eLanguageEnable);
+
+    useUdpTransport =
+        CONNECTOR_AIR_GAP_TRANSPORT_UDP_VALUE.equals(
+            parameters
+                .getStringOrDefault(
+                    Arrays.asList(CONNECTOR_AIR_GAP_TRANSPORT_KEY, 
SINK_AIR_GAP_TRANSPORT_KEY),
+                    CONNECTOR_AIR_GAP_TRANSPORT_DEFAULT_VALUE)
+                .trim()
+                .toLowerCase());
+    udpPacketSizeInBytes =
+        parameters.getIntOrDefault(
+            Arrays.asList(CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_KEY, 
SINK_AIR_GAP_UDP_PACKET_SIZE_KEY),
+            CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_DEFAULT_VALUE);
+    LOGGER.info(
+        "Air gap transport is {}, udp packet size is {} bytes.",
+        useUdpTransport ? "udp" : "tcp",
+        udpPacketSizeInBytes);
   }
 
   @Override
@@ -179,8 +310,12 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
       final AirGapSocket socket = new AirGapSocket(ip, port);
 
       try {
-        socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs);
-        socket.setKeepAlive(true);
+        if (useUdpTransport) {
+          socket.connectUdp(handshakeTimeoutMs);
+        } else {
+          socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs);
+          socket.setKeepAlive(true);
+        }
         sockets.set(i, socket);
         LOGGER.info(PipeMessages.CONNECTED_TO_TARGET_SERVER, ip, port);
         failLogTimes.remove(nodeUrls.get(i));
@@ -318,6 +453,10 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
           String.format("Socket %s is closed, will try to handshake", socket));
     }
 
+    if (socket.isUdp()) {
+      return sendBytesByUdp(socket, bytes);
+    }
+
     final BufferedOutputStream outputStream = new 
BufferedOutputStream(socket.getOutputStream());
     bytes = enrichWithLengthAndChecksum(bytes);
     outputStream.write(eLanguageEnable ? enrichWithELanguage(bytes) : bytes);
@@ -328,6 +467,101 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
     return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response);
   }
 
+  private boolean sendBytesByUdp(final AirGapSocket socket, final byte[] 
bytes) throws IOException {
+    for (final byte[] requestBytes : sliceIfNeededForUdp(bytes)) {
+      if (!sendOneDatagram(socket, requestBytes)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean sendOneDatagram(final AirGapSocket socket, final byte[] 
requestBytes)
+      throws IOException {
+    final byte[] datagramBytes =
+        eLanguageEnable
+            ? enrichWithELanguage(enrichWithLengthAndChecksum(requestBytes))
+            : enrichWithLengthAndChecksum(requestBytes);
+    if (datagramBytes.length > udpPacketSizeInBytes) {
+      throw new IOException(
+          String.format(
+              "Air gap UDP datagram size %d exceeds configured packet size 
%d.",
+              datagramBytes.length, udpPacketSizeInBytes));
+    }
+
+    final DatagramSocket datagramSocket = socket.getDatagramSocket();
+    datagramSocket.send(
+        new DatagramPacket(
+            datagramBytes,
+            datagramBytes.length,
+            socket.getDatagramAddress(),
+            socket.getEndPoint().getPort()));
+
+    final byte[] response = new byte[1];
+    final DatagramPacket responsePacket = new DatagramPacket(response, 
response.length);
+    datagramSocket.receive(responsePacket);
+    return responsePacket.getLength() > 0 && 
Arrays.equals(AirGapOneByteResponse.OK, response);
+  }
+
+  private List<byte[]> sliceIfNeededForUdp(final byte[] requestBytes) throws 
IOException {
+    final int rawPayloadSizeLimit =
+        udpPacketSizeInBytes
+            - UDP_ENVELOPE_SIZE
+            - (eLanguageEnable
+                ? AirGapELanguageConstant.E_LANGUAGE_PREFIX.length
+                    + AirGapELanguageConstant.E_LANGUAGE_SUFFIX.length
+                : 0);
+    if (requestBytes.length <= rawPayloadSizeLimit) {
+      return Arrays.asList(requestBytes);
+    }
+
+    final int sliceBodySizeLimit =
+        rawPayloadSizeLimit - UDP_SLICE_REQUEST_SERIALIZATION_RESERVED_SIZE;
+    if (sliceBodySizeLimit <= 0) {
+      throw new IOException(
+          String.format(
+              "Air gap UDP packet size %d is too small to transfer sliced 
requests.",
+              udpPacketSizeInBytes));
+    }
+
+    final TPipeTransferReq request = toTPipeTransferReq(requestBytes);
+    final int sliceOrderId = PipeTransferSliceReqBuilder.nextSliceOrderId();
+    final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(request, 
sliceBodySizeLimit);
+
+    final List<byte[]> slicedRequestBytes = new ArrayList<>(sliceCount);
+    for (int i = 0; i < sliceCount; i++) {
+      slicedRequestBytes.add(
+          toTPipeTransferBytes(
+              PipeTransferSliceReqBuilder.buildSliceReq(
+                  request, sliceOrderId, i, sliceCount, sliceBodySizeLimit)));
+    }
+    return slicedRequestBytes;
+  }
+
+  private TPipeTransferReq toTPipeTransferReq(final byte[] requestBytes) {
+    final ByteBuffer byteBuffer = ByteBuffer.wrap(requestBytes);
+    final TPipeTransferReq request = new TPipeTransferReq();
+    request.version = ReadWriteIOUtils.readByte(byteBuffer);
+    request.type = ReadWriteIOUtils.readShort(byteBuffer);
+    request.body = byteBuffer.slice();
+    return request;
+  }
+
+  private byte[] toTPipeTransferBytes(final TPipeTransferReq request) throws 
IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(request.version, outputStream);
+      ReadWriteIOUtils.write(request.type, outputStream);
+
+      final ByteBuffer bodyBuffer = request.body.duplicate();
+      final byte[] body = new byte[bodyBuffer.remaining()];
+      bodyBuffer.get(body);
+      outputStream.write(body);
+
+      return Arrays.copyOf(byteArrayOutputStream.getBuf(), 
byteArrayOutputStream.size());
+    }
+  }
+
   protected boolean send(final AirGapSocket socket, final byte[] bytes) throws 
IOException {
     return send(null, 0, socket, bytes);
   }


Reply via email to