This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 494c5915cf1 Pipe: Fixed the potential leak file handle at the receiver
(#17589) (#17606)
494c5915cf1 is described below
commit 494c5915cf1cac476e1adb4c9e1887fd8562de4d
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 7 09:34:19 2026 +0800
Pipe: Fixed the potential leak file handle at the receiver (#17589) (#17606)
---
.../commons/pipe/receiver/IoTDBFileReceiver.java | 26 ++++-
.../pipe/receiver/IoTDBFileReceiverTest.java | 124 ++++++++++++++++++++-
2 files changed, 145 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 61d9155f996..7346ae2bccd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -126,6 +126,10 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
String.format(
"Pipe-Receiver-%s-%s:%s", receiverId.get(), getSenderHost(),
getSenderPort()));
+ // Handshake restarts the transfer session. Reset the current writing
state before recycling the
+ // old receiver dir, otherwise the old file handle can survive across
handshakes.
+ resetCurrentWritingFileState();
+
// Clear the original receiver file dir if exists
if (receiverFileDirWithIdSuffix.get() != null) {
if (receiverFileDirWithIdSuffix.get().exists()) {
@@ -547,6 +551,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
}
+ private void resetCurrentWritingFileState() {
+ closeCurrentWritingFileWriter(false);
+ writingFile = null;
+ }
+
private void deleteFile(final File file) {
if (file.exists()) {
try {
@@ -589,6 +598,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
protected final TPipeTransferResp handleTransferFileSealV1(final
PipeTransferFileSealReqV1 req) {
+ File sealedWritingFile = null;
+ boolean shouldDeleteSealedFile = true;
try {
if (!isWritingFileAvailable()) {
final TSStatus status =
@@ -605,7 +616,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
return resp;
}
- final String fileAbsolutePath = writingFile.getAbsolutePath();
+ sealedWritingFile = writingFile;
+ final String fileAbsolutePath = sealedWritingFile.getAbsolutePath();
// Sync here is necessary to ensure that the data is written to the
disk. Or data region may
// load the file before the data is written to the disk and cause
unexpected behavior after
@@ -624,11 +636,13 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
writingFileWriter.close();
writingFileWriter = null;
- // writingFile will be deleted after load if no exception occurs
+ // Clear the reference before loading so the next file transfer can not
reuse the same path.
+ // The loader owns cleanup after a successful load.
writingFile = null;
final TSStatus status = loadFileV1(req, fileAbsolutePath);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ shouldDeleteSealedFile = false;
LOGGER.info(
"Receiver id = {}: Seal file {} successfully.", receiverId.get(),
fileAbsolutePath);
} else {
@@ -657,7 +671,13 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
// All pieces of the writing file and its mod (if exists) should be
retransmitted by the
// sender.
closeCurrentWritingFileWriter(false);
- deleteCurrentWritingFile();
+ if (shouldDeleteSealedFile) {
+ if (writingFile != null) {
+ deleteCurrentWritingFile();
+ } else if (sealedWritingFile != null) {
+ deleteFile(sealedWritingFile);
+ }
+ }
}
}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
index 337cece8a48..c692cb1513a 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
@@ -20,10 +20,12 @@
package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferHandshakeV1Req;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -33,6 +35,8 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
@@ -86,9 +90,61 @@ public class IoTDBFileReceiverTest {
}
}
+ @Test
+ public void testHandshakeResetsWritingFileState() throws Exception {
+ final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test");
+ final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile());
+ try {
+ receiver.handshake();
+ receiver.createWritingFile("normal.tsfile", true);
+ receiver.writeToCurrentWritingFile(new byte[] {1, 2, 3});
+
+ final File oldReceiverDir = receiver.getCurrentReceiverDir();
+ Assert.assertNotNull(receiver.getCurrentWritingFile());
+ Assert.assertNotNull(receiver.getCurrentWritingFileWriter());
+
+ receiver.handshake();
+
+ Assert.assertFalse(oldReceiverDir.exists());
+ Assert.assertNull(receiver.getCurrentWritingFile());
+ Assert.assertNull(receiver.getCurrentWritingFileWriter());
+ Assert.assertNotEquals(
+ oldReceiverDir.getAbsolutePath(),
receiver.getCurrentReceiverDir().getAbsolutePath());
+ } finally {
+ receiver.handleExit();
+ }
+ }
+
+ @Test
+ public void testSealFileV1FailureDeletesTransferredFile() throws Exception {
+ final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test");
+ final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile());
+ try {
+ receiver.createWritingFile("normal.tsfile", true);
+ receiver.writeToCurrentWritingFile(new byte[] {1, 2, 3});
+ receiver.setLoadFileV1Status(
+ new TSStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR.getStatusCode()));
+
+ final File transferredFile =
receiver.getWritingFileInBaseDir("normal.tsfile");
+ final TPipeTransferResp response = receiver.sealFileV1("normal.tsfile",
3L);
+
+ Assert.assertEquals(
+ TSStatusCode.PIPE_TRANSFER_FILE_ERROR.getStatusCode(),
response.getStatus().getCode());
+ Assert.assertFalse(transferredFile.exists());
+ Assert.assertNull(receiver.getCurrentWritingFile());
+ Assert.assertNull(receiver.getCurrentWritingFileWriter());
+ } finally {
+ receiver.handleExit();
+ }
+ }
+
private static class DummyFileReceiver extends IoTDBFileReceiver {
+ private final File receiverFileBaseDir;
+ private TSStatus loadFileV1Status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
DummyFileReceiver(final File baseDir) {
+ receiverFileBaseDir = baseDir;
receiverFileDirWithIdSuffix.set(baseDir);
}
@@ -96,6 +152,24 @@ public class IoTDBFileReceiverTest {
updateWritingFileIfNeeded(fileName, isSingleFile);
}
+ void handshake() throws IOException {
+ handleTransferHandshakeV1(
+ DummyHandshakeReq.toTPipeTransferReq(
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+ }
+
+ void writeToCurrentWritingFile(final byte[] bytes) throws Exception {
+ getCurrentWritingFileWriter().write(bytes);
+ }
+
+ void setLoadFileV1Status(final TSStatus status) {
+ loadFileV1Status = status;
+ }
+
+ TPipeTransferResp sealFileV1(final String fileName, final long fileLength)
throws IOException {
+ return
handleTransferFileSealV1(DummyFileSealReqV1.toTPipeTransferReq(fileName,
fileLength));
+ }
+
TPipeTransferResp sealFiles(final List<String> fileNames, final List<Long>
fileLengths)
throws IOException {
return handleTransferFileSealV2(
@@ -106,9 +180,27 @@ public class IoTDBFileReceiverTest {
return
receiverFileDirWithIdSuffix.get().toPath().resolve(fileName).toFile();
}
+ File getCurrentReceiverDir() {
+ return receiverFileDirWithIdSuffix.get();
+ }
+
+ File getCurrentWritingFile() throws Exception {
+ return (File) getField("writingFile").get(this);
+ }
+
+ RandomAccessFile getCurrentWritingFileWriter() throws Exception {
+ return (RandomAccessFile) getField("writingFileWriter").get(this);
+ }
+
+ private Field getField(final String fieldName) throws NoSuchFieldException
{
+ final Field field = IoTDBFileReceiver.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field;
+ }
+
@Override
protected String getReceiverFileBaseDir() {
- return receiverFileDirWithIdSuffix.get().getAbsolutePath();
+ return receiverFileBaseDir.getAbsolutePath();
}
@Override
@@ -134,7 +226,7 @@ public class IoTDBFileReceiverTest {
@Override
protected TSStatus loadFileV1(
final PipeTransferFileSealReqV1 req, final String fileAbsolutePath) {
- return new TSStatus(200);
+ return loadFileV1Status;
}
@Override
@@ -155,6 +247,34 @@ public class IoTDBFileReceiverTest {
}
}
+ private static class DummyHandshakeReq extends PipeTransferHandshakeV1Req {
+
+ static DummyHandshakeReq toTPipeTransferReq(final String
timestampPrecision)
+ throws IOException {
+ return (DummyHandshakeReq)
+ new
DummyHandshakeReq().convertToTPipeTransferReq(timestampPrecision);
+ }
+
+ @Override
+ protected PipeRequestType getPlanType() {
+ return PipeRequestType.HANDSHAKE_DATANODE_V1;
+ }
+ }
+
+ private static class DummyFileSealReqV1 extends PipeTransferFileSealReqV1 {
+
+ static DummyFileSealReqV1 toTPipeTransferReq(final String fileName, final
long fileLength)
+ throws IOException {
+ return (DummyFileSealReqV1)
+ new DummyFileSealReqV1().convertToTPipeTransferReq(fileName,
fileLength);
+ }
+
+ @Override
+ protected PipeRequestType getPlanType() {
+ return PipeRequestType.TRANSFER_TS_FILE_SEAL;
+ }
+ }
+
private static class DummyFileSealReqV2 extends PipeTransferFileSealReqV2 {
static DummyFileSealReqV2 toTPipeTransferReq(