This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 73cf0670b63 Pipe: Fixed the bug that air gap receiver may not respond 
in temporary timeout exception & Optimized the directory check in receiver & 
Fixed the bug that the "skipIfNoPrivileges" may be wrongly reused at receiver & 
Optimized the configNode pipe logic (#17556)
73cf0670b63 is described below

commit 73cf0670b637dd584c2ed806101485b8934ec0b6
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 29 11:20:57 2026 +0800

    Pipe: Fixed the bug that air gap receiver may not respond in temporary 
timeout exception & Optimized the directory check in receiver & Fixed the bug 
that the "skipIfNoPrivileges" may be wrongly reused at receiver & Optimized the 
configNode pipe logic (#17556)
    
    * re
    
    * sink
    
    * fix
    
    * rollback
---
 .../iotdb/confignode/manager/ProcedureManager.java |   4 +-
 .../runtime/heartbeat/PipeHeartbeatParser.java     |  38 +++--
 .../runtime/PipeHandleLeaderChangeProcedure.java   |   6 +-
 .../runtime/heartbeat/PipeHeartbeatParserTest.java | 182 +++++++++++++++++++++
 .../PipeHandleLeaderChangeProcedureTest.java       |  46 ++++++
 .../protocol/airgap/IoTDBAirGapReceiver.java       |   5 +
 .../iotconsensusv2/IoTConsensusV2Receiver.java     |  68 ++++++--
 .../client/IoTDBDataNodeAsyncClientManager.java    |   5 +-
 .../protocol/airgap/IoTDBAirGapReceiverTest.java   | 103 ++++++++++++
 .../sink/IoTDBDataNodeAsyncClientManagerTest.java  |  88 ++++++++++
 .../commons/pipe/receiver/IoTDBFileReceiver.java   |  75 ++++++---
 .../pipe/receiver/PipeReceiverFilePathUtils.java   |  42 +++++
 .../pipe/receiver/IoTDBFileReceiverTest.java       |  46 ++++++
 13 files changed, 651 insertions(+), 57 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index b80376e014a..a9457f23964 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1666,7 +1666,7 @@ public class ProcedureManager {
     }
   }
 
-  public void pipeHandleMetaChange(
+  public boolean pipeHandleMetaChange(
       boolean needWriteConsensusOnConfigNodes, boolean 
needPushPipeMetaToDataNodes) {
     try {
       final long procedureId =
@@ -1674,8 +1674,10 @@ public class ProcedureManager {
               new PipeHandleMetaChangeProcedure(
                   needWriteConsensusOnConfigNodes, 
needPushPipeMetaToDataNodes));
       LOGGER.info("PipeHandleMetaChangeProcedure was submitted, procedureId: 
{}.", procedureId);
+      return true;
     } catch (Exception e) {
       LOGGER.warn("PipeHandleMetaChangeProcedure was failed to submit.", e);
+      return false;
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index ace07f5e2d3..6dc11ddd3f3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -59,7 +59,7 @@ public class PipeHeartbeatParser {
     this.configManager = configManager;
 
     heartbeatCounter = 0;
-    registeredNodeNumber = 1;
+    registeredNodeNumber = getExpectedHeartbeatNodeCount();
 
     needWriteConsensusOnConfigNodes = new AtomicBoolean(false);
     needPushPipeMetaToDataNodes = new AtomicBoolean(false);
@@ -73,17 +73,8 @@ public class PipeHeartbeatParser {
     if (heartbeatCount % registeredNodeNumber == 0) {
       canSubmitHandleMetaChangeProcedure.set(true);
 
-      // registeredNodeNumber may be changed, update it here when we can 
submit procedure
-      registeredNodeNumber = 
configManager.getNodeManager().getRegisteredNodeCount();
-      if (registeredNodeNumber <= 0) {
-        LOGGER.warn(
-            "registeredNodeNumber is {} when parseHeartbeat from node 
(id={}).",
-            registeredNodeNumber,
-            nodeId);
-        // registeredNodeNumber can not be set to 0 in this class, otherwise 
may cause
-        // DivideByZeroException
-        registeredNodeNumber = 1;
-      }
+      // The expected reporter set may be changed, update it at the end of the 
current round.
+      registeredNodeNumber = getExpectedHeartbeatNodeCount();
     }
 
     if (pipeHeartbeat.isEmpty()
@@ -114,14 +105,14 @@ public class PipeHeartbeatParser {
                 if (canSubmitHandleMetaChangeProcedure.get()
                     && (needWriteConsensusOnConfigNodes.get()
                         || needPushPipeMetaToDataNodes.get())) {
-                  configManager
+                  if (configManager
                       .getProcedureManager()
                       .pipeHandleMetaChange(
-                          needWriteConsensusOnConfigNodes.get(), 
needPushPipeMetaToDataNodes.get());
-
-                  // Reset flags after procedure is submitted
-                  needWriteConsensusOnConfigNodes.set(false);
-                  needPushPipeMetaToDataNodes.set(false);
+                          needWriteConsensusOnConfigNodes.get(),
+                          needPushPipeMetaToDataNodes.get())) {
+                    needWriteConsensusOnConfigNodes.set(false);
+                    needPushPipeMetaToDataNodes.set(false);
+                  }
                 }
               } finally {
                 
configManager.getPipeManager().getPipeTaskCoordinator().unlock();
@@ -129,6 +120,17 @@ public class PipeHeartbeatParser {
             });
   }
 
+  private int getExpectedHeartbeatNodeCount() {
+    final int expectedNodeCount =
+        configManager.getNodeManager().getRegisteredDataNodeCount()
+            + (PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() ? 1 
: 0);
+    if (expectedNodeCount <= 0) {
+      LOGGER.warn("Expected pipe heartbeat node count is {}, fallback to 1.", 
expectedNodeCount);
+      return 1;
+    }
+    return expectedNodeCount;
+  }
+
   private void parseHeartbeatAndSaveMetaChangeLocally(
       final AtomicReference<PipeTaskInfo> pipeTaskInfo,
       final int nodeId,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index f18737e8b7a..61f6f3cae2a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -164,7 +164,11 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
       final int oldDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer);
       final int newDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer);
       regionGroupToOldAndNewLeaderPairMap.put(
-          new TConsensusGroupId(TConsensusGroupType.DataRegion, 
dataRegionGroupId),
+          new TConsensusGroupId(
+              dataRegionGroupId == Integer.MIN_VALUE
+                  ? TConsensusGroupType.ConfigRegion
+                  : TConsensusGroupType.DataRegion,
+              dataRegionGroupId),
           new Pair<>(oldDataRegionLeaderId, newDataRegionLeaderId));
     }
   }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
new file mode 100644
index 00000000000..d5a46d42c84
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.ProcedureManager;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
+import 
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.PipeRuntimeCoordinator;
+import 
org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PipeHeartbeatParserTest {
+
+  private boolean originalSeparatedPipeHeartbeatEnabled;
+
+  @Before
+  public void setUp() {
+    originalSeparatedPipeHeartbeatEnabled =
+        
CommonDescriptor.getInstance().getConfig().isSeperatedPipeHeartbeatEnabled();
+  }
+
+  @After
+  public void tearDown() {
+    CommonDescriptor.getInstance()
+        .getConfig()
+        
.setSeperatedPipeHeartbeatEnabled(originalSeparatedPipeHeartbeatEnabled);
+  }
+
+  @Test
+  public void 
testParseHeartbeatCountsOnlyDataNodesWhenSeparatedHeartbeatDisabled()
+      throws Exception {
+    
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);
+
+    final ParserTestContext context = createParserTestContext(2);
+    setMetaChangeFlags(context.parser, true, false);
+
+    context.parser.parseHeartbeat(1, emptyHeartbeat());
+    verify(context.procedureManager, 
never()).pipeHandleMetaChange(anyBoolean(), anyBoolean());
+
+    context.parser.parseHeartbeat(2, emptyHeartbeat());
+    verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, 
false);
+  }
+
+  @Test
+  public void 
testParseHeartbeatCountsLocalConfigNodeWhenSeparatedHeartbeatEnabled()
+      throws Exception {
+    
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(true);
+
+    final ParserTestContext context = createParserTestContext(2);
+    setMetaChangeFlags(context.parser, true, false);
+
+    context.parser.parseHeartbeat(1, emptyHeartbeat());
+    context.parser.parseHeartbeat(2, emptyHeartbeat());
+    verify(context.procedureManager, 
never()).pipeHandleMetaChange(anyBoolean(), anyBoolean());
+
+    context.parser.parseHeartbeat(3, emptyHeartbeat());
+    verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, 
false);
+  }
+
+  @Test
+  public void 
testParseHeartbeatKeepsPendingFlagsWhenProcedureSubmissionFails() throws 
Exception {
+    
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);
+
+    final ParserTestContext context = createParserTestContext(2);
+    when(context.procedureManager.pipeHandleMetaChange(anyBoolean(), 
anyBoolean()))
+        .thenReturn(false, true);
+    setMetaChangeFlags(context.parser, true, false);
+
+    context.parser.parseHeartbeat(1, emptyHeartbeat());
+    verify(context.procedureManager, 
never()).pipeHandleMetaChange(anyBoolean(), anyBoolean());
+
+    context.parser.parseHeartbeat(2, emptyHeartbeat());
+    verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, 
false);
+
+    context.parser.parseHeartbeat(3, emptyHeartbeat());
+    verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, 
false);
+
+    context.parser.parseHeartbeat(4, emptyHeartbeat());
+    verify(context.procedureManager, times(2)).pipeHandleMetaChange(true, 
false);
+  }
+
+  private ParserTestContext createParserTestContext(final int 
registeredDataNodeCount) {
+    final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    final NodeManager nodeManager = Mockito.mock(NodeManager.class);
+    final ProcedureManager procedureManager = 
Mockito.mock(ProcedureManager.class);
+    final PipeManager pipeManager = Mockito.mock(PipeManager.class);
+    final PipeRuntimeCoordinator pipeRuntimeCoordinator =
+        Mockito.mock(PipeRuntimeCoordinator.class);
+    final PipeTaskCoordinator pipeTaskCoordinator = 
Mockito.mock(PipeTaskCoordinator.class);
+    final ExecutorService procedureSubmitter = 
Mockito.mock(ExecutorService.class);
+
+    when(configManager.getNodeManager()).thenReturn(nodeManager);
+    when(configManager.getProcedureManager()).thenReturn(procedureManager);
+    when(configManager.getPipeManager()).thenReturn(pipeManager);
+    
when(nodeManager.getRegisteredDataNodeCount()).thenReturn(registeredDataNodeCount);
+    
when(pipeManager.getPipeRuntimeCoordinator()).thenReturn(pipeRuntimeCoordinator);
+    when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator);
+    
when(pipeRuntimeCoordinator.getProcedureSubmitter()).thenReturn(procedureSubmitter);
+    when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(new 
PipeTaskInfo()));
+    when(procedureManager.pipeHandleMetaChange(anyBoolean(), 
anyBoolean())).thenReturn(true);
+    Mockito.doAnswer(
+            invocation -> {
+              ((Runnable) invocation.getArgument(0)).run();
+              return CompletableFuture.completedFuture(null);
+            })
+        .when(procedureSubmitter)
+        .submit(any(Runnable.class));
+
+    return new ParserTestContext(new PipeHeartbeatParser(configManager), 
procedureManager);
+  }
+
+  private void setMetaChangeFlags(
+      final PipeHeartbeatParser parser,
+      final boolean needWriteConsensusOnConfigNodes,
+      final boolean needPushPipeMetaToDataNodes)
+      throws Exception {
+    setAtomicBooleanField(
+        parser, "needWriteConsensusOnConfigNodes", 
needWriteConsensusOnConfigNodes);
+    setAtomicBooleanField(parser, "needPushPipeMetaToDataNodes", 
needPushPipeMetaToDataNodes);
+  }
+
+  private void setAtomicBooleanField(
+      final PipeHeartbeatParser parser, final String fieldName, final boolean 
value)
+      throws Exception {
+    final Field field = PipeHeartbeatParser.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    ((AtomicBoolean) field.get(parser)).set(value);
+  }
+
+  private PipeHeartbeat emptyHeartbeat() {
+    return new PipeHeartbeat(Collections.emptyList(), null, null, null);
+  }
+
+  private static class ParserTestContext {
+    private final PipeHeartbeatParser parser;
+    private final ProcedureManager procedureManager;
+
+    private ParserTestContext(
+        final PipeHeartbeatParser parser, final ProcedureManager 
procedureManager) {
+      this.parser = parser;
+      this.procedureManager = procedureManager;
+    }
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
index b76e291b0c3..75c0963a27f 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
@@ -21,7 +21,10 @@ package 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.PublicBAOS;
@@ -45,6 +48,9 @@ public class PipeHandleLeaderChangeProcedureTest {
     leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), 
new Pair<>(1, 2));
     leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 2), 
new Pair<>(2, 3));
     leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 3), 
new Pair<>(4, 5));
+    leaderMap.put(
+        new TConsensusGroupId(TConsensusGroupType.ConfigRegion, 
Integer.MIN_VALUE),
+        new Pair<>(6, 7));
 
     PipeHandleLeaderChangeProcedure proc = new 
PipeHandleLeaderChangeProcedure(leaderMap);
 
@@ -60,4 +66,44 @@ public class PipeHandleLeaderChangeProcedureTest {
       fail();
     }
   }
+
+  @Test
+  public void deserializeOldFormatConfigRegionTest() {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap = new HashMap<>();
+    leaderMap.put(
+        new TConsensusGroupId(TConsensusGroupType.ConfigRegion, 
Integer.MIN_VALUE),
+        new Pair<>(6, 7));
+
+    try {
+      
outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode());
+      outputStream.writeLong(Procedure.NO_PROC_ID);
+      outputStream.writeInt(ProcedureState.INITIALIZING.ordinal());
+      outputStream.writeLong(0L);
+      outputStream.writeLong(0L);
+      outputStream.writeLong(Procedure.NO_PROC_ID);
+      outputStream.writeLong(Procedure.NO_TIMEOUT);
+      outputStream.writeInt(-1);
+      outputStream.write((byte) 0);
+      outputStream.writeInt(-1);
+      outputStream.write((byte) 0);
+      outputStream.writeInt(0);
+      outputStream.write((byte) 0);
+      outputStream.writeInt(leaderMap.size());
+      outputStream.writeInt(Integer.MIN_VALUE);
+      outputStream.writeInt(6);
+      outputStream.writeInt(7);
+
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+      PipeHandleLeaderChangeProcedure proc =
+          (PipeHandleLeaderChangeProcedure) 
ProcedureFactory.getInstance().create(buffer);
+
+      assertEquals(new PipeHandleLeaderChangeProcedure(leaderMap), proc);
+    } catch (Exception e) {
+      fail();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 8658d12b6a8..278c1ccaaef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -178,6 +178,11 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
       if (System.currentTimeMillis() - startTime
           < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
         handleReq(req, startTime);
+      } else {
+        LOGGER.warn(
+            "Pipe air gap receiver {}: Temporary unavailable retry timed out, 
returning FAIL to sender.",
+            receiverId);
+        fail();
       }
     } else {
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
index e0ef8e4072c..27ce077252e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverFilePathUtils;
 import 
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2RequestType;
 import 
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2RequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2TransferFilePieceReq;
@@ -78,6 +79,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -517,16 +519,23 @@ public class IoTConsensusV2Receiver {
     long startPreCheckNanos = System.nanoTime();
     iotConsensusV2ReceiverMetrics.recordBorrowTsFileWriterTimer(
         startPreCheckNanos - startBorrowTsFileWriterNanos);
-    File writingFile = tsFileWriter.getWritingFile();
-    RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
-
-    File currentWritingDirPath = tsFileWriter.getLocalWritingDir();
+    final File writingFile = tsFileWriter.getWritingFile();
+    final RandomAccessFile writingFileWriter = 
tsFileWriter.getWritingFileWriter();
 
-    final List<File> files =
-        req.getFileNames().stream()
-            .map(fileName -> new File(currentWritingDirPath, fileName))
-            .collect(Collectors.toList());
+    final File currentWritingDirPath = tsFileWriter.getLocalWritingDir();
     try {
+      final List<File> files =
+          req.getFileNames().stream()
+              .map(
+                  fileName -> {
+                    try {
+                      return resolveWritingFilePath(tsFileWriter, 
fileName).toFile();
+                    } catch (final IOException e) {
+                      throw new IllegalArgumentException(e);
+                    }
+                  })
+              .collect(Collectors.toList());
+
       if (isWritingFileNonAvailable(tsFileWriter)) {
         final TSStatus status =
             RpcUtils.getStatus(
@@ -601,16 +610,20 @@ public class IoTConsensusV2Receiver {
       }
       return new TIoTConsensusV2TransferResp(status);
     } catch (Exception e) {
+      final Throwable rootCause = e instanceof IllegalArgumentException ? 
e.getCause() : e;
       LOGGER.warn(
           "IoTConsensusV2-PipeName-{}: Failed to seal file {} from req {}.",
           consensusPipeName,
-          files,
+          req.getFileNames(),
           req,
-          e);
+          rootCause);
       return new TIoTConsensusV2TransferResp(
           RpcUtils.getStatus(
               TSStatusCode.IOT_CONSENSUS_V2_TRANSFER_FILE_ERROR,
-              String.format("Failed to seal file %s because %s", writingFile, 
e.getMessage())));
+              String.format(
+                  "Failed to seal file %s because %s",
+                  req.getFileNames(),
+                  rootCause == null ? e.getMessage() : 
rootCause.getMessage())));
     } finally {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod(if exists) should be 
retransmitted by the
@@ -809,7 +822,22 @@ public class IoTConsensusV2Receiver {
   private boolean isFileExistedAndNameCorrect(
       IoTConsensusV2TsFileWriter tsFileWriter, String fileName) {
     final File writingFile = tsFileWriter.getWritingFile();
-    return writingFile != null && writingFile.getName().equals(fileName);
+    try {
+      return writingFile != null
+          && writingFile.exists()
+          && writingFile
+              .toPath()
+              .toAbsolutePath()
+              .normalize()
+              .equals(resolveWritingFilePath(tsFileWriter, fileName));
+    } catch (final IOException e) {
+      LOGGER.warn(
+          "IoTConsensusV2-PipeName-{}: Illegal file name {} when checking 
writing file.",
+          consensusPipeName,
+          fileName,
+          e);
+      return false;
+    }
   }
 
   private boolean isWritingFileOffsetNonCorrect(
@@ -874,7 +902,7 @@ public class IoTConsensusV2Receiver {
     }
     // Every tsFileWriter has its own writing path.
     // 1 Thread --> 1 connection --> 1 tsFileWriter --> 1 path
-    tsFileWriter.setWritingFile(new File(tsFileWriter.getLocalWritingDir(), 
fileName));
+    tsFileWriter.setWritingFile(resolveWritingFilePath(tsFileWriter, 
fileName).toFile());
     tsFileWriter.setWritingFileWriter(new 
RandomAccessFile(tsFileWriter.getWritingFile(), "rw"));
     LOGGER.info(
         "IoTConsensusV2-PipeName-{}: Writing file {} was created. Ready to 
write file pieces.",
@@ -882,6 +910,20 @@ public class IoTConsensusV2Receiver {
         tsFileWriter.getWritingFile().getPath());
   }
 
+  private Path resolveWritingFilePath(
+      final IoTConsensusV2TsFileWriter tsFileWriter, final String fileName) 
throws IOException {
+    try {
+      return PipeReceiverFilePathUtils.resolveFilePath(
+          tsFileWriter.getLocalWritingDir().toPath(), fileName);
+    } catch (final IOException e) {
+      LOGGER.error(
+          "IoTConsensusV2-PipeName-{}: Path traversal attempt detected! 
Filename: {}",
+          consensusPipeName,
+          fileName);
+      throw e;
+    }
+  }
+
   private void initiateTsFileBufferFolder(List<String> receiverBaseDirsName) 
throws IOException {
     // initiate receiverFileDirs
     for (String receiverFileBaseDir : receiverBaseDirsName) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 88a79146295..09580fec279 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -119,14 +119,15 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
     receiverAttributes =
         String.format(
-            "%s-%s-%s-%s-%s-%s",
+            "%s-%s-%s-%s-%s-%s-%s",
             Base64.getEncoder()
                 .encodeToString((userEntity.getUsername() + ":" + 
password).getBytes()),
             shouldReceiverConvertOnTypeMismatch,
             loadTsFileStrategy,
             validateTsFile,
             shouldMarkAsPipeRequest,
-            isTSFileUsed);
+            isTSFileUsed,
+            skipIfNoPrivileges);
     synchronized (IoTDBDataNodeAsyncClientManager.class) {
       if 
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
 {
         ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
index 19dea8140a1..e23db1f1ca8 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
@@ -19,18 +19,32 @@
 
 package org.apache.iotdb.db.pipe.receiver.protocol.airgap;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
 import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant;
+import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapOneByteResponse;
+import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import 
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.tsfile.utils.BytesUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.Socket;
+import java.nio.ByteBuffer;
 
 public class IoTDBAirGapReceiverTest {
 
@@ -69,4 +83,93 @@ public class IoTDBAirGapReceiverTest {
         Assert.assertThrows(IOException.class, () -> 
receiver.readData(inputStream));
     Assert.assertTrue(exception.getMessage().contains("nested E-Language 
prefix"));
   }
+
+  @Test
+  public void testTemporaryUnavailableRetryTimeoutReturnsFail() throws 
Exception {
+    final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
+    final long originalRetryLocalIntervalMs = 
commonConfig.getPipeAirGapRetryLocalIntervalMs();
+    final long originalRetryMaxMs = commonConfig.getPipeAirGapRetryMaxMs();
+
+    try {
+      commonConfig.setPipeAirGapRetryLocalIntervalMs(0);
+      commonConfig.setPipeAirGapRetryMaxMs(1);
+
+      final RecordingSocket socket = new RecordingSocket();
+      final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(socket, 3L);
+      final StubIoTDBDataNodeReceiverAgent stubAgent = new 
StubIoTDBDataNodeReceiverAgent();
+      stubAgent.setStubReceiver(
+          new StubReceiver(
+              new TSStatus(
+                  
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())));
+      setField(receiver, "agent", stubAgent);
+
+      final AirGapPseudoTPipeTransferRequest req = new 
AirGapPseudoTPipeTransferRequest();
+      req.setVersion(IoTDBSinkRequestVersion.VERSION_1.getVersion());
+      req.setType((short) 0);
+      req.setBody(ByteBuffer.allocate(0));
+
+      final Method handleReq =
+          IoTDBAirGapReceiver.class.getDeclaredMethod(
+              "handleReq", AirGapPseudoTPipeTransferRequest.class, long.class);
+      handleReq.setAccessible(true);
+      handleReq.invoke(receiver, req, System.currentTimeMillis() - 10_000L);
+
+      Assert.assertArrayEquals(AirGapOneByteResponse.FAIL, 
socket.getWrittenBytes());
+    } finally {
+      
commonConfig.setPipeAirGapRetryLocalIntervalMs(originalRetryLocalIntervalMs);
+      commonConfig.setPipeAirGapRetryMaxMs(originalRetryMaxMs);
+    }
+  }
+
+  private static void setField(final Object target, final String fieldName, 
final Object value)
+      throws Exception {
+    final Field field = IoTDBAirGapReceiver.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+
+  private static class RecordingSocket extends Socket {
+
+    private final ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream();
+
+    @Override
+    public OutputStream getOutputStream() {
+      return outputStream;
+    }
+
+    byte[] getWrittenBytes() {
+      return outputStream.toByteArray();
+    }
+  }
+
+  private static class StubIoTDBDataNodeReceiverAgent extends 
IoTDBDataNodeReceiverAgent {
+
+    void setStubReceiver(final IoTDBReceiver receiver) {
+      setReceiverWithSpecifiedClient(null, receiver);
+    }
+  }
+
+  private static class StubReceiver implements IoTDBReceiver {
+
+    private final TPipeTransferResp response;
+
+    private StubReceiver(final TSStatus status) {
+      response = new TPipeTransferResp(status);
+    }
+
+    @Override
+    public TPipeTransferResp receive(final TPipeTransferReq req) {
+      return response;
+    }
+
+    @Override
+    public void handleExit() {
+      // noop for unit test
+    }
+
+    @Override
+    public IoTDBSinkRequestVersion getVersion() {
+      return IoTDBSinkRequestVersion.VERSION_1;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
new file mode 100644
index 00000000000..fb13e438dec
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.audit.UserEntity;
+import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeAsyncClientManager;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+
+public class IoTDBDataNodeAsyncClientManagerTest {
+
+  @Test
+  public void testReceiverAttributesShouldDifferentiateSkipIfNoPrivileges() 
throws Exception {
+    final IoTDBDataNodeAsyncClientManager managerWithSkipIf =
+        new IoTDBDataNodeAsyncClientManager(
+            Collections.singletonList(new TEndPoint("127.0.0.1", 6667)),
+            false,
+            "round-robin",
+            new UserEntity(1L, "user", "cli-host"),
+            "password",
+            true,
+            "sync",
+            true,
+            true,
+            false,
+            true);
+    final IoTDBDataNodeAsyncClientManager managerWithoutSkipIf =
+        new IoTDBDataNodeAsyncClientManager(
+            Collections.singletonList(new TEndPoint("127.0.0.1", 6667)),
+            false,
+            "round-robin",
+            new UserEntity(1L, "user", "cli-host"),
+            "password",
+            true,
+            "sync",
+            true,
+            true,
+            false,
+            false);
+
+    try {
+      Assert.assertNotEquals(
+          getReceiverAttributes(managerWithSkipIf), 
getReceiverAttributes(managerWithoutSkipIf));
+      Assert.assertNotSame(
+          getEndPoint2Client(managerWithSkipIf), 
getEndPoint2Client(managerWithoutSkipIf));
+    } finally {
+      managerWithSkipIf.close();
+      managerWithoutSkipIf.close();
+    }
+  }
+
+  private static String getReceiverAttributes(final 
IoTDBDataNodeAsyncClientManager manager)
+      throws Exception {
+    final Field field =
+        
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("receiverAttributes");
+    field.setAccessible(true);
+    return (String) field.get(manager);
+  }
+
+  private static Object getEndPoint2Client(final 
IoTDBDataNodeAsyncClientManager manager)
+      throws Exception {
+    final Field field = 
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("endPoint2Client");
+    field.setAccessible(true);
+    return field.get(manager);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index e2484576a77..6879f88ced8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -497,16 +497,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
             receiverFileDirWithIdSuffix.get().getPath());
       }
     }
-    Path baseDir = 
receiverFileDirWithIdSuffix.get().toPath().toAbsolutePath().normalize();
-    Path targetPath = baseDir.resolve(fileName).toAbsolutePath().normalize();
-
-    if (!targetPath.startsWith(baseDir)) {
-      LOGGER.error(
-          "Receiver id = {}: Path traversal attempt detected! Filename: {}",
-          receiverId.get(),
-          fileName);
-      throw new IOException("Illegal fileName: " + fileName + " (Path 
traversal detected)");
-    }
+    final Path targetPath = resolveReceiverFilePath(fileName);
 
     writingFile = targetPath.toFile();
     writingFileWriter = new RandomAccessFile(writingFile, "rw");
@@ -517,7 +508,37 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   }
 
   private boolean isFileExistedAndNameCorrect(final String fileName) {
-    return writingFile != null && writingFile.exists() && 
writingFile.getName().equals(fileName);
+    try {
+      return writingFile != null
+          && writingFile.exists()
+          && receiverFileDirWithIdSuffix.get() != null
+          && writingFile
+              .toPath()
+              .toAbsolutePath()
+              .normalize()
+              .equals(resolveReceiverFilePath(fileName));
+    } catch (final IOException e) {
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          "Receiver id = %s: Illegal file name %s when checking writing file.",
+          receiverId.get(),
+          fileName);
+      return false;
+    }
+  }
+
+  private Path resolveReceiverFilePath(final String fileName) throws 
IOException {
+    try {
+      return PipeReceiverFilePathUtils.resolveFilePath(
+          receiverFileDirWithIdSuffix.get().toPath(), fileName);
+    } catch (final IOException e) {
+      LOGGER.error(
+          "Receiver id = {}: Path traversal attempt detected! Filename: {}",
+          receiverId.get(),
+          fileName);
+      throw e;
+    }
   }
 
   private void closeCurrentWritingFileWriter(final boolean fsyncBeforeClose) {
@@ -680,15 +701,22 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   // Support null in fileName list, which means that this file is optional and 
is currently absent
   protected final TPipeTransferResp handleTransferFileSealV2(final 
PipeTransferFileSealReqV2 req) {
     final List<String> fileNames = req.getFileNames();
-    final List<File> files =
-        fileNames.stream()
-            .map(
-                fileName ->
-                    Objects.nonNull(fileName)
-                        ? new File(receiverFileDirWithIdSuffix.get(), fileName)
-                        : null)
-            .collect(Collectors.toList());
     try {
+      final List<File> files =
+          fileNames.stream()
+              .map(
+                  fileName -> {
+                    if (Objects.isNull(fileName)) {
+                      return null;
+                    }
+                    try {
+                      return resolveReceiverFilePath(fileName).toFile();
+                    } catch (final IOException e) {
+                      throw new IllegalArgumentException(e);
+                    }
+                  })
+              .collect(Collectors.toList());
+
       if (!isWritingFileAvailable()) {
         final TSStatus status =
             RpcUtils.getStatus(
@@ -754,17 +782,20 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       }
       return new TPipeTransferResp(status);
     } catch (final Exception e) {
+      final Throwable rootCause = e instanceof IllegalArgumentException ? 
e.getCause() : e;
       PipeLogger.log(
           LOGGER::warn,
-          e,
+          rootCause,
           "Receiver id = %s: Failed to seal file %s from req %s.",
           receiverId.get(),
-          files,
+          fileNames,
           req);
       return new TPipeTransferResp(
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
-              String.format("Failed to seal file %s because %s", files, 
e.getMessage())));
+              String.format(
+                  "Failed to seal file %s because %s",
+                  fileNames, rootCause == null ? e.getMessage() : 
rootCause.getMessage())));
     } finally {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod(if exists) should be 
retransmitted by the
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
new file mode 100644
index 00000000000..bc7275d4ebe
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.receiver;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+public final class PipeReceiverFilePathUtils {
+
+  private PipeReceiverFilePathUtils() {
+    // Utility class
+  }
+
+  public static Path resolveFilePath(final Path baseDir, final String 
fileName) throws IOException {
+    final Path normalizedBaseDir = baseDir.toAbsolutePath().normalize();
+    final Path normalizedTargetPath =
+        normalizedBaseDir.resolve(fileName).toAbsolutePath().normalize();
+
+    if (!normalizedTargetPath.startsWith(normalizedBaseDir)) {
+      throw new IOException("Illegal fileName: " + fileName + " (Path 
traversal detected)");
+    }
+
+    return normalizedTargetPath;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
index a372326433d..8d2db54d5b6 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.commons.pipe.receiver;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
@@ -33,6 +35,8 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 public class IoTDBFileReceiverTest {
@@ -63,6 +67,25 @@ public class IoTDBFileReceiverTest {
     }
   }
 
+  @Test
+  public void testRejectPathTraversalFileNameInSealRequest() throws Exception {
+    final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test");
+    final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile());
+    try {
+      receiver.createWritingFile("normal.tsfile", false);
+
+      final TPipeTransferResp response =
+          receiver.sealFiles(
+              Arrays.asList("../outside.mod", "normal.tsfile"), 
Arrays.asList(0L, 0L));
+
+      Assert.assertEquals(
+          TSStatusCode.PIPE_TRANSFER_FILE_ERROR.getStatusCode(), 
response.getStatus().getCode());
+      Assert.assertTrue(response.getStatus().getMessage().contains("Illegal 
fileName"));
+    } finally {
+      receiver.handleExit();
+    }
+  }
+
   private static class DummyFileReceiver extends IoTDBFileReceiver {
 
     DummyFileReceiver(final File baseDir) {
@@ -73,6 +96,12 @@ public class IoTDBFileReceiverTest {
       updateWritingFileIfNeeded(fileName, isSingleFile);
     }
 
+    TPipeTransferResp sealFiles(final List<String> fileNames, final List<Long> 
fileLengths)
+        throws IOException {
+      return handleTransferFileSealV2(
+          DummyFileSealReqV2.toTPipeTransferReq(fileNames, fileLengths, 
Collections.emptyMap()));
+    }
+
     File getWritingFileInBaseDir(final String fileName) {
       return 
receiverFileDirWithIdSuffix.get().toPath().resolve(fileName).toFile();
     }
@@ -130,4 +159,21 @@ public class IoTDBFileReceiverTest {
       return null;
     }
   }
+
+  private static class DummyFileSealReqV2 extends PipeTransferFileSealReqV2 {
+
+    static DummyFileSealReqV2 toTPipeTransferReq(
+        final List<String> fileNames,
+        final List<Long> fileLengths,
+        final java.util.Map<String, String> parameters)
+        throws IOException {
+      return (DummyFileSealReqV2)
+          new DummyFileSealReqV2().convertToTPipeTransferReq(fileNames, 
fileLengths, parameters);
+    }
+
+    @Override
+    protected PipeRequestType getPlanType() {
+      return PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL;
+    }
+  }
 }


Reply via email to