This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new aaf5139029e refine tsfileWriter reset logic (#16309)
aaf5139029e is described below

commit aaf5139029e7533191701d370338783461afac1d
Author: Peng Junzhi <[email protected]>
AuthorDate: Sat Aug 30 21:11:23 2025 -0500

    refine tsfileWriter reset logic (#16309)
    
    Co-authored-by: 彭俊植 <[email protected]>
---
 .../pipeconsensus/PipeConsensusReceiver.java       | 304 ++++++++-------------
 1 file changed, 116 insertions(+), 188 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 9549eba9adb..2b116d2e4d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -448,17 +448,8 @@ public class PipeConsensusReceiver {
       writingFileWriter.getFD().sync();
       // 1. The writing file writer must be closed, otherwise it may cause 
concurrent errors during
       // the process of loading tsfile when parsing tsfile.
-      //
-      // 2. The writing file must be set to null, otherwise if the next passed 
tsfile has the same
-      // name as the current tsfile, it will bypass the judgment logic of
-      // updateWritingFileIfNeeded#isFileExistedAndNameCorrect, and continue 
to write to the already
-      // loaded file. Since the writing file writer has already been closed, 
it will throw a Stream
-      // Close exception.
+      // 2. writingFileWriter and writingFile will be reset in 
`releaseTsFileWriter`
       writingFileWriter.close();
-      tsFileWriter.setWritingFileWriter(null);
-
-      // writingFile will be deleted after load if no exception occurs
-      tsFileWriter.setWritingFile(null);
 
       long endPreCheckNanos = System.nanoTime();
       pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(
@@ -567,17 +558,8 @@ public class PipeConsensusReceiver {
       writingFileWriter.getFD().sync();
       // 1. The writing file writer must be closed, otherwise it may cause 
concurrent errors during
       // the process of loading tsfile when parsing tsfile.
-      //
-      // 2. The writing file must be set to null, otherwise if the next passed 
tsfile has the same
-      // name as the current tsfile, it will bypass the judgment logic of
-      // updateWritingFileIfNeeded#isFileExistedAndNameCorrect, and continue 
to write to the already
-      // loaded file. Since the writing file writer has already been closed, 
it will throw a Stream
-      // Close exception.
+      // 2. writingFileWriter and writingFile will be reset in 
`releaseTsFileWriter`
       writingFileWriter.close();
-      tsFileWriter.setWritingFileWriter(null);
-
-      // WritingFile will be deleted after load if no exception occurs
-      tsFileWriter.setWritingFile(null);
 
       final List<String> fileAbsolutePaths =
           
files.stream().map(File::getAbsolutePath).collect(Collectors.toList());
@@ -628,24 +610,6 @@ public class PipeConsensusReceiver {
     }
   }
 
-  private void releaseTsFileWriter(
-      PipeConsensusTsFileWriter tsFileWriter, boolean fsyncBeforeClose) {
-    if (tsFileWriter == null) {
-      return;
-    }
-    closeCurrentWritingFileWriter(tsFileWriter, fsyncBeforeClose);
-    deleteCurrentWritingFile(tsFileWriter);
-    try {
-      tsFileWriter.returnSelf(consensusPipeName);
-    } catch (IOException | DiskSpaceInsufficientException e) {
-      LOGGER.warn(
-          "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
-          consensusPipeName,
-          tsFileWriter,
-          e);
-    }
-  }
-
   private TPipeConsensusTransferResp checkNonFinalFileSeal(
       final PipeConsensusTsFileWriter tsFileWriter,
       final File file,
@@ -846,100 +810,13 @@ public class PipeConsensusReceiver {
     return !offsetCorrect;
   }
 
-  private void closeCurrentWritingFileWriter(
-      PipeConsensusTsFileWriter tsFileWriter, boolean fsyncBeforeClose) {
-    if (tsFileWriter.getWritingFileWriter() != null) {
-      try {
-        if (fsyncBeforeClose) {
-          tsFileWriter.getWritingFileWriter().getFD().sync();
-        }
-        tsFileWriter.getWritingFileWriter().close();
-        LOGGER.info(
-            "PipeConsensus-PipeName-{}: Current writing file writer {} was 
closed.",
-            consensusPipeName,
-            tsFileWriter.getWritingFile() == null
-                ? "null"
-                : tsFileWriter.getWritingFile().getPath());
-        tsFileWriter.setWritingFileWriter(null);
-      } catch (IOException e) {
-        LOGGER.warn(
-            "PipeConsensus-PipeName-{}: Failed to close current writing file 
writer {}, because {}.",
-            consensusPipeName,
-            tsFileWriter.getWritingFile() == null
-                ? "null"
-                : tsFileWriter.getWritingFile().getPath(),
-            e.getMessage(),
-            e);
-      }
-    } else {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "PipeConsensus-PipeName-{}: Current writing file writer is null. 
No need to close.",
-            consensusPipeName.toString());
-      }
-    }
-  }
-
-  private void deleteFileOrDirectoryIfExists(File file, String reason) {
-    if (file.exists()) {
-      try {
-        if (file.isDirectory()) {
-          RetryUtils.retryOnException(
-              () -> {
-                FileUtils.deleteDirectory(file);
-                return null;
-              });
-        } else {
-          RetryUtils.retryOnException(() -> FileUtils.delete(file));
-        }
-        LOGGER.info(
-            "PipeConsensus-PipeName-{}: {} {} was deleted.",
-            consensusPipeName,
-            reason,
-            file.getPath());
-      } catch (IOException e) {
-        LOGGER.warn(
-            "PipeConsensus-PipeName-{}: {} Failed to delete {}, because {}.",
-            consensusPipeName,
-            reason,
-            file.getPath(),
-            e.getMessage(),
-            e);
-      }
-    } else {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "PipeConsensus-PipeName-{}: {} {} is not existed. No need to 
delete.",
-            consensusPipeName,
-            reason,
-            file.getPath());
-      }
-    }
-  }
-
-  private void deleteCurrentWritingFile(PipeConsensusTsFileWriter 
tsFileWriter) {
-    if (tsFileWriter.getLocalWritingDir() != null) {
-      try {
-        // There may be multiple files such as mods and tsfile pieces in the 
dir. Here we clean the
-        // dir instead of deleting it to avoid repeatedly deleting and 
creating the base dir for
-        // tsfile writer
-        FileUtils.cleanDirectory(tsFileWriter.getLocalWritingDir());
-      } catch (IOException e) {
-        LOGGER.warn(
-            "PipeConsensus-PipeName-{}: Failed to clean current writing file 
dir {}.",
-            consensusPipeName,
-            tsFileWriter.getLocalWritingDir().getPath(),
-            e);
-      }
-    }
-  }
-
   private void updateWritingFileIfNeeded(
       final PipeConsensusTsFileWriter tsFileWriter,
       final String fileName,
       final boolean isSingleFile)
       throws IOException {
-    if (isFileExistedAndNameCorrect(tsFileWriter, fileName)) {
+    if (isFileExistedAndNameCorrect(tsFileWriter, fileName)
+        && tsFileWriter.getWritingFileWriter() != null) {
       return;
     }
 
@@ -954,7 +831,10 @@ public class PipeConsensusReceiver {
     // If there are multiple files we can not delete the current file
     // instead they will be deleted after seal request
     if (tsFileWriter.getWritingFile() != null && isSingleFile) {
-      deleteCurrentWritingFile(tsFileWriter);
+      deleteFileOrDirectoryIfExists(
+          tsFileWriter.getWritingFile(),
+          false,
+          String.format("Update TsFileWriter-%s", tsFileWriter.index));
     }
 
     // Make sure receiver file dir exists
@@ -1002,7 +882,8 @@ public class PipeConsensusReceiver {
                 consensusPipeName, newReceiverDir.getPath()));
       }
       // Remove exists dir
-      deleteFileOrDirectoryIfExists(newReceiverDir, "Initial Receiver: delete 
origin receive dir");
+      deleteFileOrDirectoryIfExists(
+          newReceiverDir, true, "Initial Receiver: delete origin receive dir");
 
       if (!newReceiverDir.mkdirs()) {
         LOGGER.warn(
@@ -1022,7 +903,7 @@ public class PipeConsensusReceiver {
     // Clear the original receiver file dir if exists
     for (String receiverFileBaseDir : receiveDirs) {
       File receiverDir = new File(receiverFileBaseDir);
-      deleteFileOrDirectoryIfExists(receiverDir, "Clear receive dir manually");
+      deleteFileOrDirectoryIfExists(receiverDir, true, "Clear receive dir 
manually");
     }
   }
 
@@ -1146,20 +1027,11 @@ public class PipeConsensusReceiver {
               writer -> {
                 if (System.currentTimeMillis() - writer.lastUsedTs
                     >= IOTDB_CONFIG.getTsFileWriterZombieThreshold()) {
-                  try {
-                    writer.closeSelf(consensusPipeName);
-                    writer.returnSelf(consensusPipeName);
-                    LOGGER.info(
-                        "PipeConsensus-PipeName-{}: tsfile writer-{} is 
cleaned up because no new requests were received for too long.",
-                        consensusPipeName,
-                        writer.index);
-                  } catch (IOException | DiskSpaceInsufficientException e) {
-                    LOGGER.warn(
-                        "PipeConsensus-PipeName-{}: receiver watch dog failed 
to return tsFileWriter-{}.",
-                        consensusPipeName.toString(),
-                        writer.index,
-                        e);
-                  }
+                  releaseTsFileWriter(writer, false);
+                  LOGGER.info(
+                      "PipeConsensus-PipeName-{}: tsfile writer-{} is cleaned 
up because no new requests were received for too long.",
+                      consensusPipeName,
+                      writer.index);
                 }
               });
     }
@@ -1183,17 +1055,7 @@ public class PipeConsensusReceiver {
                 break;
               }
             }
-
-            try {
-              tsFileWriter.closeSelf(consensusPipeName);
-              tsFileWriter.returnSelf(consensusPipeName);
-            } catch (IOException | DiskSpaceInsufficientException e) {
-              LOGGER.warn(
-                  "PipeConsensus-PipeName-{}: receiver thread failed to return 
tsFileWriter-{} when exiting.",
-                  consensusPipeName.toString(),
-                  tsFileWriter.index,
-                  e);
-            }
+            releaseTsFileWriter(tsFileWriter, false);
           });
     }
   }
@@ -1234,6 +1096,7 @@ public class PipeConsensusReceiver {
                 File writingDir = new File(receiverBasePath + File.separator + 
index);
                 deleteFileOrDirectoryIfExists(
                     writingDir,
+                    true,
                     String.format(
                         "TsFileWriter-%s roll to new dir and delete last 
writing dir", index));
 
@@ -1339,45 +1202,110 @@ public class PipeConsensusReceiver {
           consensusPipeName.toString(),
           index);
     }
+  }
 
-    public void closeSelf(ConsensusPipeName consensusPipeName) {
-      // close file writer
-      if (writingFileWriter != null) {
-        try {
-          writingFileWriter.close();
-          LOGGER.info(
-              "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file 
writer was closed.",
-              consensusPipeName.toString(),
-              index);
-          setWritingFileWriter(null);
-        } catch (Exception e) {
-          LOGGER.warn(
-              "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Close Writing 
file writer error.",
-              consensusPipeName,
-              index,
-              e);
-        }
-      } else {
-        if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug(
-              "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file 
writer is null. No need to close.",
-              consensusPipeName.toString(),
-              index);
+  private void closeCurrentWritingFileWriter(
+      PipeConsensusTsFileWriter tsFileWriter, boolean fsyncBeforeClose) {
+    if (tsFileWriter.getWritingFileWriter() != null) {
+      try {
+        if (fsyncBeforeClose) {
+          tsFileWriter.getWritingFileWriter().getFD().sync();
         }
+        tsFileWriter.getWritingFileWriter().close();
+        LOGGER.info(
+            "PipeConsensus-PipeName-{}: Current writing file writer {} was 
closed.",
+            consensusPipeName,
+            tsFileWriter.getWritingFile() == null
+                ? "null"
+                : tsFileWriter.getWritingFile().getPath());
+        tsFileWriter.setWritingFileWriter(null);
+      } catch (IOException e) {
+        LOGGER.warn(
+            "PipeConsensus-PipeName-{}: Failed to close current writing file 
writer {}, because {}.",
+            consensusPipeName,
+            tsFileWriter.getWritingFile() == null
+                ? "null"
+                : tsFileWriter.getWritingFile().getPath(),
+            e.getMessage(),
+            e);
       }
+    } else {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "PipeConsensus-PipeName-{}: Current writing file writer is null. 
No need to close.",
+            consensusPipeName.toString());
+      }
+    }
+  }
 
-      // close file
-      if (writingFile != null) {
-        deleteFileOrDirectoryIfExists(
-            writingFile, String.format("TsFileWriter-%s exit: delete writing 
file", this.index));
-        setWritingFile(null);
-      } else {
-        if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug(
-              "PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file is 
null. No need to delete.",
-              consensusPipeName.toString());
+  private void deleteFileOrDirectoryIfExists(File file, boolean deleteDir, 
String reason) {
+    if (file.exists()) {
+      try {
+        if (file.isDirectory()) {
+          if (deleteDir) {
+            RetryUtils.retryOnException(
+                () -> {
+                  FileUtils.deleteDirectory(file);
+                  return null;
+                });
+          } else {
+            // There may be multiple files such as mods and tsfile pieces in 
the dir. Here we clean
+            // the
+            // dir instead of deleting it to avoid repeatedly deleting and 
creating the base dir for
+            // tsfile writer
+            RetryUtils.retryOnException(
+                () -> {
+                  FileUtils.cleanDirectory(file);
+                  return null;
+                });
+          }
+        } else {
+          RetryUtils.retryOnException(() -> FileUtils.delete(file));
         }
+        LOGGER.info(
+            "PipeConsensus-PipeName-{}: {} {} was deleted.",
+            consensusPipeName,
+            reason,
+            file.getPath());
+      } catch (IOException e) {
+        LOGGER.warn(
+            "PipeConsensus-PipeName-{}: {} Failed to delete {}, because {}.",
+            consensusPipeName,
+            reason,
+            file.getPath(),
+            e.getMessage(),
+            e);
       }
+    } else {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "PipeConsensus-PipeName-{}: {} {} is not existed. No need to 
delete.",
+            consensusPipeName,
+            reason,
+            file.getPath());
+      }
+    }
+  }
+
+  private void releaseTsFileWriter(
+      PipeConsensusTsFileWriter tsFileWriter, boolean fsyncBeforeClose) {
+    if (tsFileWriter == null) {
+      return;
+    }
+    closeCurrentWritingFileWriter(tsFileWriter, fsyncBeforeClose);
+    deleteFileOrDirectoryIfExists(
+        tsFileWriter.getLocalWritingDir(),
+        false,
+        String.format("Release TsFileWriter-%s", tsFileWriter.index));
+    tsFileWriter.setWritingFile(null);
+    try {
+      tsFileWriter.returnSelf(consensusPipeName);
+    } catch (IOException | DiskSpaceInsufficientException e) {
+      LOGGER.warn(
+          "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
+          consensusPipeName,
+          tsFileWriter,
+          e);
     }
   }
 

Reply via email to