This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aabccb74fee3f7a9c42f1f8185e7b7aa7a47682d
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jul 22 09:34:19 2025 +0800

    Pipe: Fixed some errors in cherry-picking (#15984)
    
    * Update TsFileProcessor.java
    
    * fix-cp-2
    
    * fix-fix
    
    * Update PipeTsFileResourceManager.java
    
    * Update PipeTsFileResourceManager.java
    
    (cherry picked from commit f076824eb057f50e8e8800bdd7e709da7868e48c)
---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  6 ---
 .../realtime/assigner/PipeDataRegionAssigner.java  |  9 ----
 .../resource/tsfile/PipeTsFileResourceManager.java | 60 +++++++++++-----------
 .../dataregion/memtable/TsFileProcessor.java       |  7 ---
 4 files changed, 31 insertions(+), 51 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 9b091a88f51..329a123c489 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -557,12 +557,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
         + (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath) 
: 0)
         + (Objects.nonNull(insertNode) ? 
InsertNodeMemoryEstimator.sizeOf(insertNode) : 0)
         + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0)
-        + (Objects.nonNull(treeModelDatabaseName)
-            ? RamUsageEstimator.sizeOf(treeModelDatabaseName)
-            : 0)
-        + (Objects.nonNull(tableModelDatabaseName)
-            ? RamUsageEstimator.sizeOf(tableModelDatabaseName)
-            : 0)
         + (Objects.nonNull(tableNames)
             ? SET_SIZE
                 + 
tableNames.stream().mapToLong(RamUsageEstimator::sizeOf).reduce(0L, Long::sum)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 6238117ed89..366a17bdd79 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -20,9 +20,6 @@
 package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
@@ -51,14 +48,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeDataRegionAssigner implements Closeable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataRegionAssigner.class);
 
-  private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
-
   /**
    * The {@link PipeDataRegionMatcher} is used to match the event with the 
extractor based on the
    * pattern.
@@ -72,9 +66,6 @@ public class PipeDataRegionAssigner implements Closeable {
 
   private Boolean isTableModel;
 
-  private final AtomicReference<ProgressIndex> 
maxProgressIndexForRealtimeEvent =
-      new AtomicReference<>(MinimumProgressIndex.INSTANCE);
-
   private final PipeEventCounter eventCounter = new 
PipeDataRegionEventCounter();
 
   public String getDataRegionId() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 31bc403b5b9..d1d1286991b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -55,6 +55,11 @@ public class PipeTsFileResourceManager {
       hardlinkOrCopiedFileToPipeTsFileResourceMap = new ConcurrentHashMap<>();
   private final PipeTsFileResourceSegmentLock segmentLock = new 
PipeTsFileResourceSegmentLock();
 
+  public File increaseFileReference(
+      final File file, final boolean isTsFile, final @Nullable String 
pipeName) throws IOException {
+    return increaseFileReference(file, isTsFile, pipeName, null);
+  }
+
   /**
    * Given a file, create a hardlink or copy it to pipe dir, maintain a 
reference count for the
    * hardlink or copied file, and return the hardlink or copied file.
@@ -71,16 +76,24 @@ public class PipeTsFileResourceManager {
    * @param file tsfile, resource file or mod file. can be original file or 
hardlink/copy of
    *     original file
    * @param isTsFile {@code true} to create hardlink, {@code false} to copy 
file
+   * @param pipeName Nonnull if the pipe is from historical or assigner -> 
extractors, null if is
+   *     dataRegion -> assigner
+   * @param sourceFile for inner use, historical extractor will use this to 
create hardlink from
+   *     pipe tsFile -> common tsFile
    * @return the hardlink or copied file
    * @throws IOException when create hardlink or copy file failed
    */
-  public File increaseFileReference(
-      final File file, final boolean isTsFile, final @Nullable String 
pipeName) throws IOException {
+  private File increaseFileReference(
+      final File file,
+      final boolean isTsFile,
+      final @Nullable String pipeName,
+      final @Nullable File sourceFile)
+      throws IOException {
     // If the file is already a hardlink or copied file,
     // just increase reference count and return it
     segmentLock.lock(file);
     try {
-      if (increaseReferenceIfExists(file, pipeName)) {
+      if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
         return file;
       }
     } finally {
@@ -89,19 +102,22 @@ public class PipeTsFileResourceManager {
 
     // If the file is not a hardlink or copied file, check if there is a 
related hardlink or
     // copied file in pipe dir. if so, increase reference count and return it
-    final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file, 
pipeName);
+    final File hardlinkOrCopiedFile =
+        Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file, 
pipeName) : file;
     segmentLock.lock(hardlinkOrCopiedFile);
     try {
-      if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName)) {
+      if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) 
{
         return 
getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
       }
 
       // If the file is a tsfile, create a hardlink in pipe dir and will 
return it.
       // otherwise, copy the file (.mod or .resource) to pipe dir and will 
return it.
+      final File source = Objects.isNull(sourceFile) ? file : sourceFile;
+
       final File resultFile =
           isTsFile
-              ? FileUtils.createHardLink(file, hardlinkOrCopiedFile)
-              : FileUtils.copyFile(file, hardlinkOrCopiedFile);
+              ? FileUtils.createHardLink(source, hardlinkOrCopiedFile)
+              : FileUtils.copyFile(source, hardlinkOrCopiedFile);
 
       // If the file is not a hardlink or copied file, and there is no related 
hardlink or copied
       // file in pipe dir, create a hardlink or copy it to pipe dir, maintain 
a reference count for
@@ -115,7 +131,7 @@ public class PipeTsFileResourceManager {
             resultFile.getPath(), new PipeTsFilePublicResource(resultFile));
       }
 
-      increasePublicReference(resultFile, pipeName);
+      increasePublicReference(resultFile, pipeName, isTsFile);
 
       return resultFile;
     } finally {
@@ -123,34 +139,26 @@ public class PipeTsFileResourceManager {
     }
   }
 
-  private boolean increaseReferenceIfExists(final File file, final @Nullable 
String pipeName) {
+  private boolean increaseReferenceIfExists(
+      final File file, final @Nullable String pipeName, final boolean 
isTsFile) throws IOException {
     final String path = file.getPath();
     final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
     if (resource != null) {
       resource.increaseReferenceCount();
-      increasePublicReference(file, pipeName);
+      increasePublicReference(file, pipeName, isTsFile);
       return true;
     }
     return false;
   }
 
-  private void increasePublicReference(final File file, final String pipeName) 
{
+  private void increasePublicReference(
+      final File file, final @Nullable String pipeName, final boolean 
isTsFile) throws IOException {
     if (Objects.isNull(pipeName)) {
       return;
     }
     // Increase the assigner's file to avoid hard-link or memory cache cleaning
     // Note that it does not exist for historical files
-    final String path = getCommonFilePath(file);
-    hardlinkOrCopiedFileToTsFilePublicResourceMap.compute(
-        path,
-        (k, v) -> {
-          if (Objects.isNull(v)) {
-            return new PipeTsFilePublicResource(new File(path));
-          } else {
-            v.increaseReferenceCount();
-            return v;
-          }
-        });
+    increaseFileReference(new File(getCommonFilePath(file)), isTsFile, null, 
file);
   }
 
   public static File getHardlinkOrCopiedFileInPipeDir(
@@ -227,13 +235,7 @@ public class PipeTsFileResourceManager {
     }
     // Increase the assigner's file to avoid hard-link or memory cache cleaning
     // Note that it does not exist for historical files
-    final String commonFilePath = getCommonFilePath(file);
-    if 
(hardlinkOrCopiedFileToTsFilePublicResourceMap.containsKey(commonFilePath)
-        && hardlinkOrCopiedFileToTsFilePublicResourceMap
-            .get(commonFilePath)
-            .decreaseReferenceCount()) {
-      hardlinkOrCopiedFileToPipeTsFileResourceMap.remove(commonFilePath);
-    }
+    decreaseFileReference(new File(getCommonFilePath(file)), null);
   }
 
   // Warning: Shall not be called by the assigner
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index cdf66b2da3c..9ac6f05df0a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1302,13 +1302,6 @@ public class TsFileProcessor {
       tsFileResource.setGeneratedByPipe(isTotallyGeneratedByPipe.get());
 
       try {
-        PipeInsertionDataNodeListener.getInstance()
-            .listenToTsFile(
-                dataRegionInfo.getDataRegion().getDataRegionId(),
-                dataRegionInfo.getDataRegion().getDatabaseName(),
-                tsFileResource,
-                false);
-
         // When invoke closing TsFile after insert data to memTable, we 
shouldn't flush until invoke
         // flushing memTable in System module.
         Future<?> future = addAMemtableIntoFlushingList(tmpMemTable);

Reply via email to