This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 494c5915cf1 Pipe: Fixed the potential leak file handle at the receiver 
(#17589) (#17606)
494c5915cf1 is described below

commit 494c5915cf1cac476e1adb4c9e1887fd8562de4d
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 7 09:34:19 2026 +0800

    Pipe: Fixed the potential leak file handle at the receiver (#17589) (#17606)
---
 .../commons/pipe/receiver/IoTDBFileReceiver.java   |  26 ++++-
 .../pipe/receiver/IoTDBFileReceiverTest.java       | 124 ++++++++++++++++++++-
 2 files changed, 145 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 61d9155f996..7346ae2bccd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -126,6 +126,10 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
             String.format(
                 "Pipe-Receiver-%s-%s:%s", receiverId.get(), getSenderHost(), 
getSenderPort()));
 
+    // Handshake restarts the transfer session. Reset the current writing 
state before recycling the
+    // old receiver dir, otherwise the old file handle can survive across 
handshakes.
+    resetCurrentWritingFileState();
+
     // Clear the original receiver file dir if exists
     if (receiverFileDirWithIdSuffix.get() != null) {
       if (receiverFileDirWithIdSuffix.get().exists()) {
@@ -547,6 +551,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     }
   }
 
+  private void resetCurrentWritingFileState() {
+    closeCurrentWritingFileWriter(false);
+    writingFile = null;
+  }
+
   private void deleteFile(final File file) {
     if (file.exists()) {
       try {
@@ -589,6 +598,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   }
 
   protected final TPipeTransferResp handleTransferFileSealV1(final 
PipeTransferFileSealReqV1 req) {
+    File sealedWritingFile = null;
+    boolean shouldDeleteSealedFile = true;
     try {
       if (!isWritingFileAvailable()) {
         final TSStatus status =
@@ -605,7 +616,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
         return resp;
       }
 
-      final String fileAbsolutePath = writingFile.getAbsolutePath();
+      sealedWritingFile = writingFile;
+      final String fileAbsolutePath = sealedWritingFile.getAbsolutePath();
 
       // Sync here is necessary to ensure that the data is written to the 
disk. Or data region may
       // load the file before the data is written to the disk and cause 
unexpected behavior after
@@ -624,11 +636,13 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       writingFileWriter.close();
       writingFileWriter = null;
 
-      // writingFile will be deleted after load if no exception occurs
+      // Clear the reference before loading so the next file transfer can not 
reuse the same path.
+      // The loader owns cleanup after a successful load.
       writingFile = null;
 
       final TSStatus status = loadFileV1(req, fileAbsolutePath);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        shouldDeleteSealedFile = false;
         LOGGER.info(
             "Receiver id = {}: Seal file {} successfully.", receiverId.get(), 
fileAbsolutePath);
       } else {
@@ -657,7 +671,13 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       // All pieces of the writing file and its mod (if exists) should be 
retransmitted by the
       // sender.
       closeCurrentWritingFileWriter(false);
-      deleteCurrentWritingFile();
+      if (shouldDeleteSealedFile) {
+        if (writingFile != null) {
+          deleteCurrentWritingFile();
+        } else if (sealedWritingFile != null) {
+          deleteFile(sealedWritingFile);
+        }
+      }
     }
   }
 
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
index 337cece8a48..c692cb1513a 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.commons.pipe.receiver;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferHandshakeV1Req;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -33,6 +35,8 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Field;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
@@ -86,9 +90,61 @@ public class IoTDBFileReceiverTest {
     }
   }
 
+  @Test
+  public void testHandshakeResetsWritingFileState() throws Exception {
+    final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test");
+    final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile());
+    try {
+      receiver.handshake();
+      receiver.createWritingFile("normal.tsfile", true);
+      receiver.writeToCurrentWritingFile(new byte[] {1, 2, 3});
+
+      final File oldReceiverDir = receiver.getCurrentReceiverDir();
+      Assert.assertNotNull(receiver.getCurrentWritingFile());
+      Assert.assertNotNull(receiver.getCurrentWritingFileWriter());
+
+      receiver.handshake();
+
+      Assert.assertFalse(oldReceiverDir.exists());
+      Assert.assertNull(receiver.getCurrentWritingFile());
+      Assert.assertNull(receiver.getCurrentWritingFileWriter());
+      Assert.assertNotEquals(
+          oldReceiverDir.getAbsolutePath(), 
receiver.getCurrentReceiverDir().getAbsolutePath());
+    } finally {
+      receiver.handleExit();
+    }
+  }
+
+  @Test
+  public void testSealFileV1FailureDeletesTransferredFile() throws Exception {
+    final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test");
+    final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile());
+    try {
+      receiver.createWritingFile("normal.tsfile", true);
+      receiver.writeToCurrentWritingFile(new byte[] {1, 2, 3});
+      receiver.setLoadFileV1Status(
+          new TSStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR.getStatusCode()));
+
+      final File transferredFile = 
receiver.getWritingFileInBaseDir("normal.tsfile");
+      final TPipeTransferResp response = receiver.sealFileV1("normal.tsfile", 
3L);
+
+      Assert.assertEquals(
+          TSStatusCode.PIPE_TRANSFER_FILE_ERROR.getStatusCode(), 
response.getStatus().getCode());
+      Assert.assertFalse(transferredFile.exists());
+      Assert.assertNull(receiver.getCurrentWritingFile());
+      Assert.assertNull(receiver.getCurrentWritingFileWriter());
+    } finally {
+      receiver.handleExit();
+    }
+  }
+
   private static class DummyFileReceiver extends IoTDBFileReceiver {
 
+    private final File receiverFileBaseDir;
+    private TSStatus loadFileV1Status = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
     DummyFileReceiver(final File baseDir) {
+      receiverFileBaseDir = baseDir;
       receiverFileDirWithIdSuffix.set(baseDir);
     }
 
@@ -96,6 +152,24 @@ public class IoTDBFileReceiverTest {
       updateWritingFileIfNeeded(fileName, isSingleFile);
     }
 
+    void handshake() throws IOException {
+      handleTransferHandshakeV1(
+          DummyHandshakeReq.toTPipeTransferReq(
+              
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+    }
+
+    void writeToCurrentWritingFile(final byte[] bytes) throws Exception {
+      getCurrentWritingFileWriter().write(bytes);
+    }
+
+    void setLoadFileV1Status(final TSStatus status) {
+      loadFileV1Status = status;
+    }
+
+    TPipeTransferResp sealFileV1(final String fileName, final long fileLength) 
throws IOException {
+      return 
handleTransferFileSealV1(DummyFileSealReqV1.toTPipeTransferReq(fileName, 
fileLength));
+    }
+
     TPipeTransferResp sealFiles(final List<String> fileNames, final List<Long> 
fileLengths)
         throws IOException {
       return handleTransferFileSealV2(
@@ -106,9 +180,27 @@ public class IoTDBFileReceiverTest {
       return 
receiverFileDirWithIdSuffix.get().toPath().resolve(fileName).toFile();
     }
 
+    File getCurrentReceiverDir() {
+      return receiverFileDirWithIdSuffix.get();
+    }
+
+    File getCurrentWritingFile() throws Exception {
+      return (File) getField("writingFile").get(this);
+    }
+
+    RandomAccessFile getCurrentWritingFileWriter() throws Exception {
+      return (RandomAccessFile) getField("writingFileWriter").get(this);
+    }
+
+    private Field getField(final String fieldName) throws NoSuchFieldException 
{
+      final Field field = IoTDBFileReceiver.class.getDeclaredField(fieldName);
+      field.setAccessible(true);
+      return field;
+    }
+
     @Override
     protected String getReceiverFileBaseDir() {
-      return receiverFileDirWithIdSuffix.get().getAbsolutePath();
+      return receiverFileBaseDir.getAbsolutePath();
     }
 
     @Override
@@ -134,7 +226,7 @@ public class IoTDBFileReceiverTest {
     @Override
     protected TSStatus loadFileV1(
         final PipeTransferFileSealReqV1 req, final String fileAbsolutePath) {
-      return new TSStatus(200);
+      return loadFileV1Status;
     }
 
     @Override
@@ -155,6 +247,34 @@ public class IoTDBFileReceiverTest {
     }
   }
 
+  private static class DummyHandshakeReq extends PipeTransferHandshakeV1Req {
+
+    static DummyHandshakeReq toTPipeTransferReq(final String 
timestampPrecision)
+        throws IOException {
+      return (DummyHandshakeReq)
+          new 
DummyHandshakeReq().convertToTPipeTransferReq(timestampPrecision);
+    }
+
+    @Override
+    protected PipeRequestType getPlanType() {
+      return PipeRequestType.HANDSHAKE_DATANODE_V1;
+    }
+  }
+
+  private static class DummyFileSealReqV1 extends PipeTransferFileSealReqV1 {
+
+    static DummyFileSealReqV1 toTPipeTransferReq(final String fileName, final 
long fileLength)
+        throws IOException {
+      return (DummyFileSealReqV1)
+          new DummyFileSealReqV1().convertToTPipeTransferReq(fileName, 
fileLength);
+    }
+
+    @Override
+    protected PipeRequestType getPlanType() {
+      return PipeRequestType.TRANSFER_TS_FILE_SEAL;
+    }
+  }
+
   private static class DummyFileSealReqV2 extends PipeTransferFileSealReqV2 {
 
     static DummyFileSealReqV2 toTPipeTransferReq(

Reply via email to