This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 32bfeaf60cf Pipe/Load: Fix loaded files may be compacted before 
listening (#12816)
32bfeaf60cf is described below

commit 32bfeaf60cfa748a55190a81308b4ed5813a7c33
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 26 22:06:29 2024 +0800

    Pipe/Load: Fix loaded files may be compacted before listening (#12816)
---
 .../db/storageengine/dataregion/DataRegion.java    | 54 +++++++++++++---------
 1 file changed, 32 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 67eaad0988e..89d11f94adc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2703,10 +2703,12 @@ public class DataRegion implements IDataRegionForQuery {
    * @param isGeneratedByPipe whether the load tsfile request is generated by 
pipe
    */
   public void loadNewTsFile(
-      TsFileResource newTsFileResource, boolean deleteOriginFile, boolean 
isGeneratedByPipe)
+      final TsFileResource newTsFileResource,
+      final boolean deleteOriginFile,
+      final boolean isGeneratedByPipe)
       throws LoadFileException {
-    File tsfileToBeInserted = newTsFileResource.getTsFile();
-    long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
+    final File tsfileToBeInserted = newTsFileResource.getTsFile();
+    final long newFilePartitionId = 
newTsFileResource.getTimePartitionWithCheck();
 
     if (!TsFileValidator.getInstance().validateTsFile(newTsFileResource)) {
       throw new LoadFileException(
@@ -2716,7 +2718,7 @@ public class DataRegion implements IDataRegionForQuery {
     writeLock("loadNewTsFile");
     try {
       newTsFileResource.setSeq(false);
-      String newFileName =
+      final String newFileName =
           getNewTsFileName(
               System.currentTimeMillis(),
               getAndSetNewVersion(newFilePartitionId, newTsFileResource),
@@ -2732,10 +2734,11 @@ public class DataRegion implements IDataRegionForQuery {
             fsFactory.getFile(tsfileToBeInserted.getParentFile(), 
newFileName));
       }
       loadTsFileToUnSequence(
-          tsfileToBeInserted, newTsFileResource, newFilePartitionId, 
deleteOriginFile);
-
-      PipeInsertionDataNodeListener.getInstance()
-          .listenToTsFile(dataRegionId, newTsFileResource, true, 
isGeneratedByPipe);
+          tsfileToBeInserted,
+          newTsFileResource,
+          newFilePartitionId,
+          deleteOriginFile,
+          isGeneratedByPipe);
 
       FileMetrics.getInstance()
           .addTsFile(
@@ -2769,7 +2772,7 @@ public class DataRegion implements IDataRegionForQuery {
       }
 
       logger.info("TsFile {} is successfully loaded in unsequence list.", 
newFileName);
-    } catch (DiskSpaceInsufficientException e) {
+    } catch (final DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to database processor {} because the 
disk space is insufficient.",
           tsfileToBeInserted.getAbsolutePath(),
@@ -2910,12 +2913,13 @@ public class DataRegion implements IDataRegionForQuery {
    * @return load the file successfully @UsedBy sync module, load external 
tsfile module.
    */
   private boolean loadTsFileToUnSequence(
-      File tsFileToLoad,
-      TsFileResource tsFileResource,
-      long filePartitionId,
-      boolean deleteOriginFile)
+      final File tsFileToLoad,
+      final TsFileResource tsFileResource,
+      final long filePartitionId,
+      final boolean deleteOriginFile,
+      boolean isGeneratedByPipe)
       throws LoadFileException, DiskSpaceInsufficientException {
-    File targetFile;
+    final File targetFile;
     targetFile =
         fsFactory.getFile(
             TierManager.getInstance().getNextFolderForTsFile(0, false),
@@ -2949,7 +2953,7 @@ public class DataRegion implements IDataRegionForQuery {
       } else {
         Files.copy(tsFileToLoad.toPath(), targetFile.toPath());
       }
-    } catch (IOException e) {
+    } catch (final IOException e) {
       logger.error(
           "File renaming failed when loading tsfile. Origin: {}, Target: {}",
           tsFileToLoad.getAbsolutePath(),
@@ -2961,8 +2965,10 @@ public class DataRegion implements IDataRegionForQuery {
               tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), 
e.getMessage()));
     }
 
-    File resourceFileToLoad = fsFactory.getFile(tsFileToLoad.getAbsolutePath() 
+ RESOURCE_SUFFIX);
-    File targetResourceFile = fsFactory.getFile(targetFile.getAbsolutePath() + 
RESOURCE_SUFFIX);
+    final File resourceFileToLoad =
+        fsFactory.getFile(tsFileToLoad.getAbsolutePath() + RESOURCE_SUFFIX);
+    final File targetResourceFile =
+        fsFactory.getFile(targetFile.getAbsolutePath() + RESOURCE_SUFFIX);
     try {
       if (deleteOriginFile) {
         FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
@@ -2970,7 +2976,7 @@ public class DataRegion implements IDataRegionForQuery {
         Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath());
       }
 
-    } catch (IOException e) {
+    } catch (final IOException e) {
       logger.error(
           "File renaming failed when loading .resource file. Origin: {}, 
Target: {}",
           resourceFileToLoad.getAbsolutePath(),
@@ -2984,16 +2990,16 @@ public class DataRegion implements IDataRegionForQuery {
               e.getMessage()));
     }
 
-    File modFileToLoad =
+    final File modFileToLoad =
         fsFactory.getFile(tsFileToLoad.getAbsolutePath() + 
ModificationFile.FILE_SUFFIX);
     if (modFileToLoad.exists()) {
       // when successfully loaded, the filepath of the resource will be 
changed to the IoTDB data
       // dir, so we can add a suffix to find the old modification file.
-      File targetModFile =
+      final File targetModFile =
           fsFactory.getFile(targetFile.getAbsolutePath() + 
ModificationFile.FILE_SUFFIX);
       try {
         Files.deleteIfExists(targetModFile.toPath());
-      } catch (IOException e) {
+      } catch (final IOException e) {
         logger.warn("Cannot delete localModFile {}", targetModFile, e);
       }
       try {
@@ -3002,7 +3008,7 @@ public class DataRegion implements IDataRegionForQuery {
         } else {
           Files.copy(modFileToLoad.toPath(), targetModFile.toPath());
         }
-      } catch (IOException e) {
+      } catch (final IOException e) {
         logger.error(
             "File renaming failed when loading .mod file. Origin: {}, Target: 
{}",
             modFileToLoad.getAbsolutePath(),
@@ -3018,6 +3024,10 @@ public class DataRegion implements IDataRegionForQuery {
       }
     }
 
+    // Listen before the tsFile is added into tsFile manager to avoid it being 
compacted
+    PipeInsertionDataNodeListener.getInstance()
+        .listenToTsFile(dataRegionId, tsFileResource, true, isGeneratedByPipe);
+
     // help tsfile resource degrade
     tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
 

Reply via email to