This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 751c5c5829b Pipe: Refactor AirGap receiver with configurable payload 
size control (#17443) (#17503)
751c5c5829b is described below

commit 751c5c5829b657d3af5922a156c5f8f47725988f
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Apr 17 14:55:16 2026 +0800

    Pipe: Refactor AirGap receiver with configurable payload size control 
(#17443) (#17503)
    
    * Pipe: add hot-reloadable AirGap payload size guard to mitigate DoS risk.
    
    Introduce a dedicated AirGap receiver payload limit in the pipe config and 
enforce it before request buffer allocation, so oversized payloads are rejected 
early and memory pressure is bounded under malicious or malformed inputs.
    
    Made-with: Cursor
    
    * update
    
    * update
    
    * update
    
    * update
    
    * update
    
    * spotless
    
    (cherry picked from commit 7afedfe0969c332ee722209f14f8634f16a21a71)
---
 .../protocol/airgap/IoTDBAirGapReceiver.java       | 25 +++++++-
 .../protocol/airgap/IoTDBAirGapReceiverTest.java   | 72 ++++++++++++++++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    | 26 ++++++++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  7 +++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  5 ++
 5 files changed, 132 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 0ff1834e8cd..8658d12b6a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -222,7 +222,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
     }
   }
 
-  private byte[] readData(final InputStream inputStream) throws IOException {
+  byte[] readData(final InputStream inputStream) throws IOException {
     final int length = readLength(inputStream);
 
     if (length <= 0) {
@@ -230,6 +230,14 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
       return new byte[0];
     }
 
+    final int maxLength = 
PipeConfig.getInstance().getPipeAirGapReceiverMaxPayloadSizeInBytes();
+    if (length > maxLength) {
+      throw new IOException(
+          String.format(
+              "AirGap payload length (%d) exceeds maximum allowed (%d). 
Closing connection from %s",
+              length, maxLength, socket.getRemoteSocketAddress()));
+    }
+
     final byte[] resultBuffer = new byte[length];
     readTillFull(inputStream, resultBuffer);
     if (isELanguagePayload) {
@@ -238,11 +246,16 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
     return resultBuffer;
   }
 
+  private int readLength(final InputStream inputStream) throws IOException {
+    return readLength(inputStream, false);
+  }
+
   /**
    * Read the length of the following data. The thread may typically block 
here when there is no
    * data to read.
    */
-  private int readLength(final InputStream inputStream) throws IOException {
+  private int readLength(final InputStream inputStream, final boolean 
isELanguage)
+      throws IOException {
     final byte[] doubleIntLengthBytes = new byte[2 * INT_LEN];
     readTillFull(inputStream, doubleIntLengthBytes);
 
@@ -251,10 +264,16 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
     if (Arrays.equals(
         doubleIntLengthBytes,
         BytesUtils.subBytes(AirGapELanguageConstant.E_LANGUAGE_PREFIX, 0, 2 * 
INT_LEN))) {
+      if (isELanguage) {
+        throw new IOException(
+            String.format(
+                "Detected suspicious nested E-Language prefix. Closing 
connection from %s",
+                socket.getRemoteSocketAddress()));
+      }
       isELanguagePayload = true;
       skipTillEnough(
           inputStream, (long) AirGapELanguageConstant.E_LANGUAGE_PREFIX.length 
- 2 * INT_LEN);
-      return readLength(inputStream);
+      return readLength(inputStream, true);
     }
 
     final byte[] dataLengthBytes = BytesUtils.subBytes(doubleIntLengthBytes, 
0, INT_LEN);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
new file mode 100644
index 00000000000..19dea8140a1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.protocol.airgap;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant;
+
+import org.apache.tsfile.utils.BytesUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+
+public class IoTDBAirGapReceiverTest {
+
+  @Test
+  public void testRejectOversizedAirGapPayload() throws Exception {
+    final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
+    final int originalMaxPayload = 
commonConfig.getPipeAirGapReceiverMaxPayloadSizeInBytes();
+
+    try {
+      commonConfig.setPipeAirGapReceiverMaxPayloadSizeInBytes(16);
+      final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(new 
Socket(), 1L);
+
+      final byte[] oversizedLength = BytesUtils.intToBytes(32);
+      final InputStream inputStream =
+          new ByteArrayInputStream(BytesUtils.concatByteArray(oversizedLength, 
oversizedLength));
+
+      final IOException exception =
+          Assert.assertThrows(IOException.class, () -> 
receiver.readData(inputStream));
+      Assert.assertTrue(exception.getMessage().contains("payload length"));
+    } finally {
+      
commonConfig.setPipeAirGapReceiverMaxPayloadSizeInBytes(originalMaxPayload);
+    }
+  }
+
+  @Test
+  public void testRejectNestedELanguagePrefix() throws Exception {
+    final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(new Socket(), 
2L);
+
+    final InputStream inputStream =
+        new ByteArrayInputStream(
+            BytesUtils.concatByteArray(
+                AirGapELanguageConstant.E_LANGUAGE_PREFIX,
+                AirGapELanguageConstant.E_LANGUAGE_PREFIX));
+
+    final IOException exception =
+        Assert.assertThrows(IOException.class, () -> 
receiver.readData(inputStream));
+    Assert.assertTrue(exception.getMessage().contains("nested E-Language 
prefix"));
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index f8e845a7af2..ec9ce06fd1d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -309,6 +309,11 @@ public class CommonConfig {
   private double pipeReceiverActualToEstimatedMemoryRatio = 3;
 
   private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
+  // Align with default thrift frame size calculation.
+  private int pipeAirGapReceiverMaxPayloadSizeInBytes =
+      Math.min(
+          64 * 1024 * 1024,
+          (int) Math.min(Runtime.getRuntime().maxMemory() / 64, 
Integer.MAX_VALUE));
   private boolean pipeReceiverLoadConversionEnabled = false;
   private volatile long pipePeriodicalLogMinIntervalSeconds = 60;
   private volatile long pipeLoggerCacheMaxSizeInBytes = 16 * MB;
@@ -1646,6 +1651,23 @@ public class CommonConfig {
         pipeReceiverReqDecompressedMaxLengthInBytes);
   }
 
+  public void setPipeAirGapReceiverMaxPayloadSizeInBytes(
+      int pipeAirGapReceiverMaxPayloadSizeInBytes) {
+    if (pipeAirGapReceiverMaxPayloadSizeInBytes <= 0) {
+      logger.info(
+          "Ignore invalid pipeAirGapReceiverMaxPayloadSizeInBytes {}, because 
it must be greater than 0.",
+          pipeAirGapReceiverMaxPayloadSizeInBytes);
+      return;
+    }
+    if (this.pipeAirGapReceiverMaxPayloadSizeInBytes == 
pipeAirGapReceiverMaxPayloadSizeInBytes) {
+      return;
+    }
+    this.pipeAirGapReceiverMaxPayloadSizeInBytes = 
pipeAirGapReceiverMaxPayloadSizeInBytes;
+    logger.info(
+        "pipeAirGapReceiverMaxPayloadSizeInBytes is set to {}.",
+        pipeAirGapReceiverMaxPayloadSizeInBytes);
+  }
+
   public boolean isPipeReceiverLoadConversionEnabled() {
     return pipeReceiverLoadConversionEnabled;
   }
@@ -1687,6 +1709,10 @@ public class CommonConfig {
     return pipeReceiverReqDecompressedMaxLengthInBytes;
   }
 
+  public int getPipeAirGapReceiverMaxPayloadSizeInBytes() {
+    return pipeAirGapReceiverMaxPayloadSizeInBytes;
+  }
+
   public double getPipeMetaReportMaxLogNumPerRound() {
     return pipeMetaReportMaxLogNumPerRound;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 966b22b5d15..0840bb48e59 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -361,6 +361,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
   }
 
+  public int getPipeAirGapReceiverMaxPayloadSizeInBytes() {
+    return COMMON_CONFIG.getPipeAirGapReceiverMaxPayloadSizeInBytes();
+  }
+
   public boolean isPipeReceiverLoadConversionEnabled() {
     return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
   }
@@ -627,6 +631,9 @@ public class PipeConfig {
     LOGGER.info(
         "PipeReceiverReqDecompressedMaxLengthInBytes: {}",
         getPipeReceiverReqDecompressedMaxLengthInBytes());
+    LOGGER.info(
+        "PipeAirGapReceiverMaxPayloadSizeInBytes: {}",
+        getPipeAirGapReceiverMaxPayloadSizeInBytes());
     LOGGER.info("PipeReceiverLoadConversionEnabled: {}", 
isPipeReceiverLoadConversionEnabled());
     LOGGER.info(
         "PipePeriodicalLogMinIntervalSeconds: {}", 
getPipePeriodicalLogMinIntervalSeconds());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 51d88e78345..02284671803 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -464,6 +464,11 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_receiver_req_decompressed_max_length_in_bytes",
                 
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));
+    config.setPipeAirGapReceiverMaxPayloadSizeInBytes(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_air_gap_receiver_max_payload_size_in_bytes",
+                
String.valueOf(config.getPipeAirGapReceiverMaxPayloadSizeInBytes()))));
     config.setPipeReceiverLoadConversionEnabled(
         Boolean.parseBoolean(
             properties.getProperty(

Reply via email to