This is an automated email from the ASF dual-hosted git repository.

pengjunzhi pushed a commit to branch fixFileWriter
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8fba211c8f05f044776782d0d56f5f9f7c325b41
Author: 彭俊植 <[email protected]>
AuthorDate: Wed Aug 13 19:19:56 2025 +0800

    fix RandomAccessFileWriter
---
 .../pipeconsensus/PipeConsensusReceiver.java       | 93 ++++++++++------------
 1 file changed, 43 insertions(+), 50 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 168e6406745..96018319bea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -168,6 +168,9 @@ public class PipeConsensusReceiver {
     // will reject synchronization.
     TPipeConsensusTransferResp resp = preCheckForReceiver(req);
     if (resp != null) {
+      // release tsFileWriter if pre-check failed as leader will resend the 
whole tsFileEvent
+      releaseTsFileWriter(
+          
pipeConsensusTsFileWriterPool.tryToFindCorrespondingWriter(req.getCommitId()), 
false);
       return resp;
     }
 
@@ -400,20 +403,8 @@ public class PipeConsensusReceiver {
         } finally {
           // Exception may occur when disk system go wrong. At this time, we 
may reset all resource
           // and receive this file from scratch when leader will try to resend 
this file from
-          // scratch
-          // as well.
-          closeCurrentWritingFileWriter(tsFileWriter, false);
-          deleteCurrentWritingFile(tsFileWriter);
-          // must return tsfileWriter after deleting its file.
-          try {
-            tsFileWriter.returnSelf(consensusPipeName);
-          } catch (IOException | DiskSpaceInsufficientException 
returnException) {
-            LOGGER.warn(
-                "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
-                consensusPipeName,
-                tsFileWriter,
-                returnException);
-          }
+          // scratch as well.
+          releaseTsFileWriter(tsFileWriter, false);
         }
       }
     } finally {
@@ -432,7 +423,6 @@ public class PipeConsensusReceiver {
     File writingFile = tsFileWriter.getWritingFile();
     RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
 
-    boolean isReturnTsFileWriter = false;
     try {
       if (isWritingFileNonAvailable(tsFileWriter)) {
         final TSStatus status =
@@ -481,8 +471,6 @@ public class PipeConsensusReceiver {
       pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() 
- endPreCheckNanos);
 
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        // if transfer success, disk buffer will be released.
-        isReturnTsFileWriter = true;
         LOGGER.info(
             "PipeConsensus-PipeName-{}: Seal file {} successfully.",
             consensusPipeName,
@@ -521,20 +509,7 @@ public class PipeConsensusReceiver {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod (if exists) should be 
retransmitted by the
       // sender.
-      closeCurrentWritingFileWriter(tsFileWriter, false);
-      deleteCurrentWritingFile(tsFileWriter);
-      // must return tsfileWriter after deleting its file.
-      if (isReturnTsFileWriter) {
-        try {
-          tsFileWriter.returnSelf(consensusPipeName);
-        } catch (IOException | DiskSpaceInsufficientException e) {
-          LOGGER.warn(
-              "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
-              consensusPipeName,
-              tsFileWriter,
-              e);
-        }
-      }
+      releaseTsFileWriter(tsFileWriter, false);
     }
   }
 
@@ -557,7 +532,6 @@ public class PipeConsensusReceiver {
         req.getFileNames().stream()
             .map(fileName -> new File(currentWritingDirPath, fileName))
             .collect(Collectors.toList());
-    boolean isReturnTsFileWriter = false;
     try {
       if (isWritingFileNonAvailable(tsFileWriter)) {
         final TSStatus status =
@@ -620,8 +594,6 @@ public class PipeConsensusReceiver {
       pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() 
- endPreCheckNanos);
 
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        // if transfer success, disk buffer will be released.
-        isReturnTsFileWriter = true;
         LOGGER.info(
             "PipeConsensus-PipeName-{}: Seal file with mods {} successfully.",
             consensusPipeName,
@@ -649,22 +621,28 @@ public class PipeConsensusReceiver {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod(if exists) should be 
retransmitted by the
       // sender.
-      closeCurrentWritingFileWriter(tsFileWriter, false);
+      releaseTsFileWriter(tsFileWriter, false);
       // Clear the directory instead of only deleting the referenced files in 
seal request
       // to avoid previously undeleted file being redundant when transferring 
multi files
       IoTDBReceiverAgent.cleanPipeReceiverDir(currentWritingDirPath);
-      // must return tsfileWriter after deleting its file.
-      if (isReturnTsFileWriter) {
-        try {
-          tsFileWriter.returnSelf(consensusPipeName);
-        } catch (IOException | DiskSpaceInsufficientException e) {
-          LOGGER.warn(
-              "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
-              consensusPipeName,
-              tsFileWriter,
-              e);
-        }
-      }
+    }
+  }
+
+  private void releaseTsFileWriter(
+      PipeConsensusTsFileWriter tsFileWriter, boolean fsyncBeforeClose) {
+    if (tsFileWriter == null) {
+      return;
+    }
+    closeCurrentWritingFileWriter(tsFileWriter, fsyncBeforeClose);
+    deleteCurrentWritingFile(tsFileWriter);
+    try {
+      tsFileWriter.returnSelf(consensusPipeName);
+    } catch (IOException | DiskSpaceInsufficientException e) {
+      LOGGER.warn(
+          "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
+          consensusPipeName,
+          tsFileWriter,
+          e);
     }
   }
 
@@ -882,6 +860,7 @@ public class PipeConsensusReceiver {
             tsFileWriter.getWritingFile() == null
                 ? "null"
                 : tsFileWriter.getWritingFile().getPath());
+        tsFileWriter.setWritingFileWriter(null);
       } catch (IOException e) {
         LOGGER.warn(
             "PipeConsensus-PipeName-{}: Failed to close current writing file 
writer {}, because {}.",
@@ -892,7 +871,6 @@ public class PipeConsensusReceiver {
             e.getMessage(),
             e);
       }
-      tsFileWriter.setWritingFileWriter(null);
     } else {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
@@ -1107,6 +1085,17 @@ public class PipeConsensusReceiver {
           IOTDB_CONFIG.getTsFileWriterCheckInterval());
     }
 
+    public PipeConsensusTsFileWriter tryToFindCorrespondingWriter(TCommitId 
commitId) {
+      Optional<PipeConsensusTsFileWriter> tsFileWriter =
+          pipeConsensusTsFileWriterPool.stream()
+              .filter(
+                  item ->
+                      item.isUsed()
+                          && Objects.equals(commitId, 
item.getCommitIdOfCorrespondingHolderEvent()))
+              .findFirst();
+      return tsFileWriter.orElse(null);
+    }
+
     @SuppressWarnings("java:S3655")
     public PipeConsensusTsFileWriter borrowCorrespondingWriter(TCommitId 
commitId) {
       Optional<PipeConsensusTsFileWriter> tsFileWriter =
@@ -1291,13 +1280,17 @@ public class PipeConsensusReceiver {
       return writingFileWriter;
     }
 
-    public void setWritingFileWriter(RandomAccessFile writingFileWriter) {
+    public void setWritingFileWriter(RandomAccessFile writingFileWriter) 
throws IOException {
       this.writingFileWriter = writingFileWriter;
       if (writingFileWriter == null) {
         LOGGER.info(
             "PipeConsensus-{}: TsFileWriter-{} set null writing file writer",
             consensusPipeName.toString(),
             index);
+      } else {
+        // seek to the end of the file to ensure that the next piece will be 
appended to the end of
+        // the file.
+        this.writingFileWriter.seek(this.writingFileWriter.length());
       }
     }
 
@@ -1353,6 +1346,7 @@ public class PipeConsensusReceiver {
               "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file 
writer was closed.",
               consensusPipeName.toString(),
               index);
+          setWritingFileWriter(null);
         } catch (Exception e) {
           LOGGER.warn(
               "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Close Writing 
file writer error.",
@@ -1360,7 +1354,6 @@ public class PipeConsensusReceiver {
               index,
               e);
         }
-        setWritingFileWriter(null);
       } else {
         if (LOGGER.isDebugEnabled()) {
           LOGGER.debug(

Reply via email to