This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c6b90a9812 [Pipe] Deduplicate historical tsfile events in 
IoTConsensusV2 pipes (#17472)
9c6b90a9812 is described below

commit 9c6b90a9812eeb2b95c7acd3faa6df7fc49dd2de
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Apr 17 10:56:09 2026 +0800

    [Pipe] Deduplicate historical tsfile events in IoTConsensusV2 pipes (#17472)
    
    * Pipe: deduplicate historical tsfile events per task scope
    
    * Pipe: address historical tsfile dedup review comments
    
    * spotless
    
    * Pipe: fix dedup scope cleanup and historical skip loop
    
    * spotless
    
    * Refine historical tsfile dedup supply semantics
    
    * spotless
---
 .../tsfile/PipeCompactedTsFileInsertionEvent.java  |   5 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  57 ++++---
 ...istoricalDataRegionTsFileAndDeletionSource.java |  87 +++++++++--
 .../PipeRealtimeDataRegionHybridSource.java        |   9 +-
 .../realtime/PipeRealtimeDataRegionSource.java     |  10 +-
 .../PipeRealtimeDataRegionTsFileSource.java        |   7 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |   1 +
 .../PipeTsFileEpochProgressIndexKeeper.java        |  78 +++++++--
 .../pipe/event/PipeTsFileInsertionEventTest.java   |  99 ++++++++++++
 ...ricalDataRegionTsFileAndDeletionSourceTest.java | 174 +++++++++++++++++++++
 .../PipeTsFileEpochProgressIndexKeeperTest.java    | 117 ++++++++++++++
 11 files changed, 585 insertions(+), 59 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
index 343c8d89329..95ff0a25373 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
@@ -91,6 +91,7 @@ public class PipeCompactedTsFileInsertionEvent extends 
PipeTsFileInsertionEvent
     // init fields of PipeTsFileInsertionEvent
     flushPointCount = bindFlushPointCount(originalEvents);
     overridingProgressIndex = bindOverridingProgressIndex(originalEvents);
+    bindTsFileDedupScopeID(anyOfOriginalEvents.getTsFileDedupScopeID());
   }
 
   private static boolean bindIsWithMod(Set<PipeTsFileInsertionEvent> 
originalEvents) {
@@ -184,10 +185,10 @@ public class PipeCompactedTsFileInsertionEvent extends 
PipeTsFileInsertionEvent
 
   @Override
   public void eliminateProgressIndex() {
-    if (Objects.isNull(overridingProgressIndex)) {
+    if (Objects.isNull(overridingProgressIndex) && 
Objects.nonNull(getTsFileDedupScopeID())) {
       for (final String originFilePath : originFilePaths) {
         PipeTsFileEpochProgressIndexKeeper.getInstance()
-            .eliminateProgressIndex(dataRegionId, pipeName, originFilePath);
+            .eliminateProgressIndex(dataRegionId, getTsFileDedupScopeID(), 
originFilePath);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 1505e15996f..adddc9d7ce5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -99,6 +99,7 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
 
   protected volatile ProgressIndex overridingProgressIndex;
   private Set<String> tableNames;
+  private String tsFileDedupScopeID;
 
   // This is set to check the tsFile paths by privilege
   private Map<IDeviceID, String[]> treeSchemaMap;
@@ -398,13 +399,26 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
   }
 
   public void eliminateProgressIndex() {
-    if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
+    if (Objects.isNull(overridingProgressIndex)
+        && Objects.nonNull(resource)
+        && Objects.nonNull(tsFileDedupScopeID)) {
       PipeTsFileEpochProgressIndexKeeper.getInstance()
           .eliminateProgressIndex(
-              Integer.parseInt(resource.getDataRegionId()), pipeName, 
resource.getTsFilePath());
+              Integer.parseInt(resource.getDataRegionId()),
+              tsFileDedupScopeID,
+              resource.getTsFilePath());
     }
   }
 
+  public PipeTsFileInsertionEvent bindTsFileDedupScopeID(final String 
tsFileDedupScopeID) {
+    this.tsFileDedupScopeID = tsFileDedupScopeID;
+    return this;
+  }
+
+  public String getTsFileDedupScopeID() {
+    return tsFileDedupScopeID;
+  }
+
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
@@ -419,25 +433,26 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
       final long startTime,
       final long endTime) {
     return new PipeTsFileInsertionEvent(
-        getRawIsTableModelEvent(),
-        getSourceDatabaseNameFromDataRegion(),
-        resource,
-        tsFile,
-        isWithMod,
-        isLoaded,
-        isGeneratedByHistoricalExtractor,
-        tableNames,
-        pipeName,
-        creationTime,
-        pipeTaskMeta,
-        treePattern,
-        tablePattern,
-        userId,
-        userName,
-        cliHostname,
-        skipIfNoPrivileges,
-        startTime,
-        endTime);
+            getRawIsTableModelEvent(),
+            getSourceDatabaseNameFromDataRegion(),
+            resource,
+            tsFile,
+            isWithMod,
+            isLoaded,
+            isGeneratedByHistoricalExtractor,
+            tableNames,
+            pipeName,
+            creationTime,
+            pipeTaskMeta,
+            treePattern,
+            tablePattern,
+            userId,
+            userName,
+            cliHostname,
+            skipIfNoPrivileges,
+            startTime,
+            endTime)
+        .bindTsFileDedupScopeID(tsFileDedupScopeID);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 66f8d48ce28..2ca283b1395 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -50,6 +50,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
@@ -124,6 +125,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
 
   private String pipeName;
   private long creationTime;
+  private String tsFileDedupScopeID;
 
   private PipeTaskMeta pipeTaskMeta;
   private ProgressIndex startIndex;
@@ -320,6 +322,14 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
     }
 
     dataRegionId = environment.getRegionId();
+    tsFileDedupScopeID =
+        pipeName
+            + "_"
+            + dataRegionId
+            + "_"
+            + creationTime
+            + "_"
+            + Integer.toHexString(System.identityHashCode(environment));
 
     treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
     tablePattern = 
TablePattern.parsePipePatternFromSourceParameters(parameters);
@@ -807,11 +817,15 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
     final PersistentResource resource = pendingQueue.poll();
     if (resource == null) {
       return supplyTerminateEvent();
-    } else if (resource instanceof TsFileResource) {
-      return supplyTsFileEvent((TsFileResource) resource);
-    } else {
-      return supplyDeletionEvent((DeletionResource) resource);
     }
+
+    if (resource instanceof TsFileResource) {
+      final TsFileResource tsFileResource = (TsFileResource) resource;
+      return consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)
+          ? supplyProgressReportEvent(tsFileResource.getMaxProgressIndex())
+          : supplyTsFileEvent(tsFileResource);
+    }
+    return supplyDeletionEvent((DeletionResource) resource);
   }
 
   private Event supplyTerminateEvent() {
@@ -834,20 +848,54 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
     return terminateEvent;
   }
 
-  private Event supplyTsFileEvent(final TsFileResource resource) {
-    if (!filteredTsFileResources2TableNames.containsKey(resource)) {
-      final ProgressReportEvent progressReportEvent =
-          new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
-      progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex());
-      final boolean isReferenceCountIncreased =
-          progressReportEvent.increaseReferenceCount(
-              PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
-      if (!isReferenceCountIncreased) {
+  protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(final 
TsFileResource resource) {
+    if (!filteredTsFileResources2TableNames.containsKey(resource)
+        || !shouldSkipHistoricalTsFileEvent(resource)) {
+      return false;
+    }
+
+    filteredTsFileResources2TableNames.remove(resource);
+    LOGGER.info(
+        "Pipe {}@{}: skip historical tsfile {} because realtime source in 
current task {} has already captured it.",
+        pipeName,
+        dataRegionId,
+        resource.getTsFilePath(),
+        tsFileDedupScopeID);
+    try {
+      return true;
+    } finally {
+      try {
+        PipeDataNodeResourceManager.tsfile()
+            .unpinTsFileResource(resource, shouldTransferModFile, pipeName);
+      } catch (final IOException e) {
         LOGGER.warn(
-            "The reference count of the event {} cannot be increased, skipping 
it.",
-            progressReportEvent);
+            "Pipe {}@{}: failed to unpin skipped historical TsFileResource, 
original path: {}",
+            pipeName,
+            dataRegionId,
+            resource.getTsFilePath(),
+            e);
       }
-      return isReferenceCountIncreased ? progressReportEvent : null;
+    }
+  }
+
+  protected Event supplyProgressReportEvent(final ProgressIndex progressIndex) 
{
+    final ProgressReportEvent progressReportEvent =
+        new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
+    progressReportEvent.bindProgressIndex(progressIndex);
+    final boolean isReferenceCountIncreased =
+        progressReportEvent.increaseReferenceCount(
+            PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
+    if (!isReferenceCountIncreased) {
+      LOGGER.warn(
+          "The reference count of the event {} cannot be increased, skipping 
it.",
+          progressReportEvent);
+    }
+    return isReferenceCountIncreased ? progressReportEvent : null;
+  }
+
+  protected Event supplyTsFileEvent(final TsFileResource resource) {
+    if (!filteredTsFileResources2TableNames.containsKey(resource)) {
+      return supplyProgressReportEvent(resource.getMaxProgressIndex());
     }
 
     final PipeTsFileInsertionEvent event =
@@ -916,6 +964,13 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
     }
   }
 
+  private boolean shouldSkipHistoricalTsFileEvent(final TsFileResource 
resource) {
+    return pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
+        && DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
+        && PipeTsFileEpochProgressIndexKeeper.getInstance()
+            .containsTsFile(dataRegionId, tsFileDedupScopeID, 
resource.getTsFilePath());
+  }
+
   private Event supplyDeletionEvent(final DeletionResource deletionResource) {
     final PipeDeleteDataNodeEvent event =
         new PipeDeleteDataNodeEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index c9e3f35288a..97b6d54fde5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -83,7 +83,8 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
     if (canNotUseTabletAnymore(event)) {
       event.getTsFileEpoch().migrateState(this, curState -> 
TsFileEpoch.State.USING_TSFILE);
       PipeTsFileEpochProgressIndexKeeper.getInstance()
-          .registerProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
+          .registerProgressIndex(
+              dataRegionId, getTsFileDedupScopeID(), 
event.getTsFileEpoch().getResource());
     } else {
       event
           .getTsFileEpoch()
@@ -156,7 +157,8 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
       case USING_TABLET:
         // If the state is USING_TABLET, discard the event
         PipeTsFileEpochProgressIndexKeeper.getInstance()
-            .eliminateProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
+            .eliminateProgressIndex(
+                dataRegionId, getTsFileDedupScopeID(), 
event.getTsFileEpoch().getFilePath());
         
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
 false);
         return;
       case EMPTY:
@@ -283,7 +285,8 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
       PipeDataNodeAgent.runtime()
           .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
       PipeTsFileEpochProgressIndexKeeper.getInstance()
-          .eliminateProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getFilePath());
+          .eliminateProgressIndex(
+              dataRegionId, getTsFileDedupScopeID(), 
event.getTsFileEpoch().getFilePath());
       return null;
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 37f1f0a74cb..2ab2061ce7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -133,6 +133,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
 
   protected String pipeID;
   private String taskID;
+  private String tsFileDedupScopeID;
   protected long userId;
   protected String userName;
   protected String cliHostname;
@@ -226,6 +227,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     creationTime = environment.getCreationTime();
     pipeID = pipeName + "_" + creationTime;
     taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
+    tsFileDedupScopeID = taskID + "_" + 
Integer.toHexString(System.identityHashCode(environment));
 
     treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
     tablePattern = 
TablePattern.parsePipePatternFromSourceParameters(parameters);
@@ -322,6 +324,8 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     if (dataRegionId >= 0) {
       
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, 
this);
       PipeTimePartitionListener.getInstance().stopListen(dataRegionId, this);
+      PipeTsFileEpochProgressIndexKeeper.getInstance()
+          .clearProgressIndex(dataRegionId, tsFileDedupScopeID);
     }
 
     synchronized (isClosed) {
@@ -580,7 +584,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     if (PipeTsFileEpochProgressIndexKeeper.getInstance()
         .isProgressIndexAfterOrEquals(
             dataRegionId,
-            pipeName,
+            tsFileDedupScopeID,
             event.getTsFileEpoch().getFilePath(),
             getProgressIndex4RealtimeEvent(event))) {
       event.skipReportOnCommit();
@@ -652,6 +656,10 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     return taskID;
   }
 
+  public final String getTsFileDedupScopeID() {
+    return tsFileDedupScopeID;
+  }
+
   public void increaseExtractEpochSize() {
     extractEpochSize.incrementAndGet();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index 98bfb30391a..97c3138de7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -52,7 +52,8 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
 
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
     PipeTsFileEpochProgressIndexKeeper.getInstance()
-        .registerProgressIndex(dataRegionId, pipeName, 
event.getTsFileEpoch().getResource());
+        .registerProgressIndex(
+            dataRegionId, getTsFileDedupScopeID(), 
event.getTsFileEpoch().getResource());
 
     if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
       
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
 false);
@@ -104,7 +105,9 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
             .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
         PipeTsFileEpochProgressIndexKeeper.getInstance()
             .eliminateProgressIndex(
-                dataRegionId, pipeName, 
realtimeEvent.getTsFileEpoch().getFilePath());
+                dataRegionId,
+                getTsFileDedupScopeID(),
+                realtimeEvent.getTsFileEpoch().getFilePath());
       }
 
       realtimeEvent.decreaseReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 9c7182f051c..bdeebde8938 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -190,6 +190,7 @@ public class PipeDataRegionAssigner implements Closeable {
               if (innerEvent instanceof PipeTsFileInsertionEvent) {
                 final PipeTsFileInsertionEvent tsFileInsertionEvent =
                     (PipeTsFileInsertionEvent) innerEvent;
+                
tsFileInsertionEvent.bindTsFileDedupScopeID(source.getTsFileDedupScopeID());
                 
tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile());
               }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index bf15dcdc547..aaf03f570e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -31,36 +31,86 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class PipeTsFileEpochProgressIndexKeeper {
 
-  // data region id -> pipeName -> tsFile path -> max progress index
+  // data region id -> task scope id -> tsFile path -> max progress index
   private final Map<Integer, Map<String, Map<String, TsFileResource>>> 
progressIndexKeeper =
       new ConcurrentHashMap<>();
 
   public synchronized void registerProgressIndex(
-      final int dataRegionId, final String pipeName, final TsFileResource 
resource) {
+      final int dataRegionId, final String taskScopeID, final TsFileResource 
resource) {
     progressIndexKeeper
         .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
+        .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>())
         .putIfAbsent(resource.getTsFilePath(), resource);
   }
 
   public synchronized void eliminateProgressIndex(
-      final int dataRegionId, final @Nonnull String pipeName, final String 
filePath) {
-    progressIndexKeeper
-        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
-        .remove(filePath);
+      final int dataRegionId, final @Nonnull String taskScopeID, final String 
filePath) {
+    final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+        progressIndexKeeper.get(dataRegionId);
+    if (scopeProgressIndexKeeper == null) {
+      return;
+    }
+
+    final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+        scopeProgressIndexKeeper.get(taskScopeID);
+    if (tsFileProgressIndexKeeper == null) {
+      return;
+    }
+
+    tsFileProgressIndexKeeper.remove(filePath);
+    if (tsFileProgressIndexKeeper.isEmpty()) {
+      scopeProgressIndexKeeper.remove(taskScopeID);
+      if (scopeProgressIndexKeeper.isEmpty()) {
+        progressIndexKeeper.remove(dataRegionId);
+      }
+    }
+  }
+
+  public synchronized void clearProgressIndex(
+      final int dataRegionId, final @Nonnull String taskScopeID) {
+    final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+        progressIndexKeeper.get(dataRegionId);
+    if (scopeProgressIndexKeeper == null) {
+      return;
+    }
+
+    scopeProgressIndexKeeper.remove(taskScopeID);
+    if (scopeProgressIndexKeeper.isEmpty()) {
+      progressIndexKeeper.remove(dataRegionId);
+    }
+  }
+
+  public synchronized boolean containsTsFile(
+      final int dataRegionId, final @Nonnull String taskScopeID, final String 
tsFilePath) {
+    final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+        progressIndexKeeper.get(dataRegionId);
+    if (scopeProgressIndexKeeper == null) {
+      return false;
+    }
+
+    final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+        scopeProgressIndexKeeper.get(taskScopeID);
+    return tsFileProgressIndexKeeper != null && 
tsFileProgressIndexKeeper.containsKey(tsFilePath);
   }
 
   public synchronized boolean isProgressIndexAfterOrEquals(
       final int dataRegionId,
-      final String pipeName,
+      final String taskScopeID,
       final String tsFilePath,
       final ProgressIndex progressIndex) {
-    return progressIndexKeeper
-        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
-        .entrySet()
-        .stream()
+    final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+        progressIndexKeeper.get(dataRegionId);
+    if (scopeProgressIndexKeeper == null) {
+      return false;
+    }
+
+    final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+        scopeProgressIndexKeeper.get(taskScopeID);
+    if (tsFileProgressIndexKeeper == null) {
+      return false;
+    }
+
+    return tsFileProgressIndexKeeper.entrySet().stream()
         .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
         .map(Entry::getValue)
         .filter(Objects::nonNull)
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
index 5ba0843bf80..db5452e0b92 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
@@ -23,13 +23,17 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.audit.IAuditEntity;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeCompactedTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
 import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
 import 
org.apache.iotdb.db.queryengine.plan.relational.security.TreeAccessCheckContext;
@@ -50,8 +54,11 @@ import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.common.TimeRange;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -157,6 +164,98 @@ public class PipeTsFileInsertionEventTest {
     }
   }
 
+  @Test
+  public void testTsFileDedupScopeIdIsPreservedForCleanupAndCopy() throws 
Exception {
+    final PipeTsFileEpochProgressIndexKeeper keeper =
+        PipeTsFileEpochProgressIndexKeeper.getInstance();
+    final int dataRegionId = 1;
+    final String scopeA = "scope-a";
+    final String scopeB = "scope-b";
+    final File tempDir = 
Files.createTempDirectory("pipeTsFileDedupScope").toFile();
+
+    try {
+      final TsFileResource sourceResource =
+          createSpyTsFileResource(tempDir, "source.tsfile", 1L, dataRegionId);
+      keeper.registerProgressIndex(dataRegionId, scopeA, sourceResource);
+      keeper.registerProgressIndex(dataRegionId, scopeB, sourceResource);
+
+      final PipeTsFileInsertionEvent sourceEvent =
+          new PipeTsFileInsertionEvent(
+                  true,
+                  "db",
+                  sourceResource,
+                  null,
+                  true,
+                  false,
+                  false,
+                  Collections.singleton("table"),
+                  "pipe",
+                  1L,
+                  null,
+                  null,
+                  null,
+                  null,
+                  null,
+                  null,
+                  true,
+                  Long.MIN_VALUE,
+                  Long.MAX_VALUE)
+              .bindTsFileDedupScopeID(scopeA);
+
+      sourceEvent.eliminateProgressIndex();
+      Assert.assertFalse(
+          keeper.containsTsFile(dataRegionId, scopeA, 
sourceResource.getTsFilePath()));
+      Assert.assertTrue(
+          keeper.containsTsFile(dataRegionId, scopeB, 
sourceResource.getTsFilePath()));
+
+      keeper.registerProgressIndex(dataRegionId, scopeA, sourceResource);
+      final PipeTsFileInsertionEvent copiedEvent =
+          sourceEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+              "pipe", 2L, null, null, null, null, null, null, true, 
Long.MIN_VALUE, Long.MAX_VALUE);
+      Assert.assertEquals(scopeA, copiedEvent.getTsFileDedupScopeID());
+      copiedEvent.eliminateProgressIndex();
+      Assert.assertFalse(
+          keeper.containsTsFile(dataRegionId, scopeA, 
sourceResource.getTsFilePath()));
+      Assert.assertTrue(
+          keeper.containsTsFile(dataRegionId, scopeB, 
sourceResource.getTsFilePath()));
+
+      keeper.registerProgressIndex(dataRegionId, scopeA, sourceResource);
+      final TsFileResource compactedResource =
+          createSpyTsFileResource(tempDir, "compacted.tsfile", 2L, 
dataRegionId);
+      final PipeCompactedTsFileInsertionEvent compactedEvent =
+          new PipeCompactedTsFileInsertionEvent(
+              new CommitterKey("pipe", 1L, dataRegionId, 0),
+              Collections.singleton(sourceEvent),
+              sourceEvent,
+              compactedResource,
+              true);
+      Assert.assertEquals(scopeA, compactedEvent.getTsFileDedupScopeID());
+      compactedEvent.eliminateProgressIndex();
+      Assert.assertFalse(
+          keeper.containsTsFile(dataRegionId, scopeA, 
sourceResource.getTsFilePath()));
+      Assert.assertTrue(
+          keeper.containsTsFile(dataRegionId, scopeB, 
sourceResource.getTsFilePath()));
+    } finally {
+      keeper.clearProgressIndex(dataRegionId, scopeA);
+      keeper.clearProgressIndex(dataRegionId, scopeB);
+      FileUtils.deleteFileOrDirectory(tempDir);
+    }
+  }
+
+  private TsFileResource createSpyTsFileResource(
+      final File tempDir, final String fileName, final long flushOrderId, 
final int dataRegionId)
+      throws IOException {
+    final File file = new File(tempDir, fileName);
+    Assert.assertTrue(file.createNewFile());
+
+    final TsFileResource resource = new TsFileResource(file);
+    resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId));
+
+    final TsFileResource spyResource = Mockito.spy(resource);
+    
Mockito.doReturn(String.valueOf(dataRegionId)).when(spyResource).getDataRegionId();
+    return spyResource;
+  }
+
   static class TestAccessControl implements AccessControl {
 
     @Override
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
new file mode 100644
index 00000000000..14f97ef79d6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.historical;
+
+import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
+
+  @Test
+  public void 
testSupplyReturnsProgressReportEventAfterSkippingDuplicateHistoricalTsFile()
+      throws Exception {
+    final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source =
+        new TestablePipeHistoricalDataRegionTsFileAndDeletionSource();
+    final Event expectedEvent = new Event() {};
+    final File tempDir = 
Files.createTempDirectory("pipeHistoricalSkipDuplicate").toFile();
+
+    try {
+      final TsFileResource skippedResource = createTsFileResource(tempDir, 
"skip.tsfile");
+      final TsFileResource nextResource = createTsFileResource(tempDir, 
"next.tsfile");
+
+      source.setSkippedTsFilePaths(skippedResource.getTsFilePath());
+      source.setSuppliedEvent(expectedEvent);
+      setPrivateField(source, "hasBeenStarted", true);
+      setPrivateField(
+          source,
+          "pendingQueue",
+          new ArrayDeque<PersistentResource>(Arrays.asList(skippedResource, 
nextResource)));
+
+      Assert.assertTrue(source.supply() instanceof ProgressReportEvent);
+      Assert.assertEquals(
+          Arrays.asList(skippedResource.getTsFilePath()), 
source.getConsumedSkippedTsFilePaths());
+      Assert.assertTrue(source.getSuppliedTsFiles().isEmpty());
+      Assert.assertEquals(1, source.getPendingQueueSize());
+
+      Assert.assertSame(expectedEvent, source.supply());
+      Assert.assertEquals(Arrays.asList(nextResource.getTsFilePath()), 
source.getSuppliedTsFiles());
+    } finally {
+      FileUtils.deleteFileOrDirectory(tempDir);
+    }
+  }
+
+  @Test
+  public void testSupplyDoesNotSwallowNonSkippedNullTsFileEvent() throws 
Exception {
+    final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source =
+        new TestablePipeHistoricalDataRegionTsFileAndDeletionSource();
+    final File tempDir = 
Files.createTempDirectory("pipeHistoricalNullSemantics").toFile();
+
+    try {
+      final TsFileResource firstResource = createTsFileResource(tempDir, 
"first.tsfile");
+      final TsFileResource secondResource = createTsFileResource(tempDir, 
"second.tsfile");
+
+      source.setSuppliedEvent(null);
+      setPrivateField(source, "hasBeenStarted", true);
+      setPrivateField(
+          source,
+          "pendingQueue",
+          new ArrayDeque<PersistentResource>(Arrays.asList(firstResource, 
secondResource)));
+
+      Assert.assertNull(source.supply());
+      Assert.assertEquals(
+          Arrays.asList(firstResource.getTsFilePath()), 
source.getSuppliedTsFiles());
+      Assert.assertEquals(1, source.getPendingQueueSize());
+    } finally {
+      FileUtils.deleteFileOrDirectory(tempDir);
+    }
+  }
+
+  private static TsFileResource createTsFileResource(final File tempDir, final 
String fileName)
+      throws IOException {
+    final File file = new File(tempDir, fileName);
+    Assert.assertTrue(file.createNewFile());
+    return new TsFileResource(file);
+  }
+
+  private static void setPrivateField(
+      final PipeHistoricalDataRegionTsFileAndDeletionSource source,
+      final String fieldName,
+      final Object value)
+      throws ReflectiveOperationException {
+    final Field field =
+        
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(source, value);
+  }
+
+  private static class TestablePipeHistoricalDataRegionTsFileAndDeletionSource
+      extends PipeHistoricalDataRegionTsFileAndDeletionSource {
+
+    private final Set<String> skippedTsFilePaths = new HashSet<>();
+    private final List<String> consumedSkippedTsFilePaths = new ArrayList<>();
+    private final List<String> suppliedTsFiles = new ArrayList<>();
+    private Event suppliedEvent;
+
+    private void setSkippedTsFilePaths(final String... skippedTsFilePaths) {
+      this.skippedTsFilePaths.clear();
+      this.skippedTsFilePaths.addAll(Arrays.asList(skippedTsFilePaths));
+    }
+
+    private List<String> getConsumedSkippedTsFilePaths() {
+      return consumedSkippedTsFilePaths;
+    }
+
+    private List<String> getSuppliedTsFiles() {
+      return suppliedTsFiles;
+    }
+
+    @Override
+    public int getPendingQueueSize() {
+      try {
+        final Field field =
+            
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredField("pendingQueue");
+        field.setAccessible(true);
+        return ((ArrayDeque<?>) field.get(this)).size();
+      } catch (final ReflectiveOperationException e) {
+        throw new AssertionError(e);
+      }
+    }
+
+    private void setSuppliedEvent(final Event suppliedEvent) {
+      this.suppliedEvent = suppliedEvent;
+    }
+
+    @Override
+    protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(
+        final TsFileResource resource) {
+      if (!skippedTsFilePaths.contains(resource.getTsFilePath())) {
+        return false;
+      }
+      consumedSkippedTsFilePaths.add(resource.getTsFilePath());
+      return true;
+    }
+
+    @Override
+    protected Event supplyTsFileEvent(final TsFileResource resource) {
+      suppliedTsFiles.add(resource.getTsFilePath());
+      return suppliedEvent;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
new file mode 100644
index 00000000000..4d27fff5d57
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+public class PipeTsFileEpochProgressIndexKeeperTest {
+
+  private static final int DATA_REGION_ID = 1;
+  private static final String TASK_SCOPE_A = "task-scope-a";
+  private static final String TASK_SCOPE_B = "task-scope-b";
+
+  private final PipeTsFileEpochProgressIndexKeeper keeper =
+      PipeTsFileEpochProgressIndexKeeper.getInstance();
+
+  private File tempDir;
+
+  @Before
+  public void setUp() throws IOException {
+    tempDir = 
Files.createTempDirectory("pipeTsFileEpochProgressIndexKeeper").toFile();
+  }
+
+  @After
+  public void tearDown() {
+    keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_A);
+    keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_B);
+    FileUtils.deleteFileOrDirectory(tempDir);
+  }
+
+  @Test
+  public void testDuplicateTsFileLookupIsScopedByTaskInstance() throws 
IOException {
+    final TsFileResource resource = createTsFileResource("shared.tsfile", 1L);
+
+    keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, resource);
+
+    Assert.assertTrue(
+        keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A, 
resource.getTsFilePath()));
+    Assert.assertFalse(
+        keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_B, 
resource.getTsFilePath()));
+  }
+
+  @Test
+  public void testProgressIndexCheckDoesNotLeakAcrossTaskScopes() throws 
IOException {
+    keeper.registerProgressIndex(
+        DATA_REGION_ID, TASK_SCOPE_A, createTsFileResource("1-1-0-0.tsfile", 
1L));
+
+    final TsFileResource comparedResource = 
createTsFileResource("1-2-0-0.tsfile", 2L);
+    keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, 
comparedResource);
+
+    Assert.assertTrue(
+        keeper.isProgressIndexAfterOrEquals(
+            DATA_REGION_ID,
+            TASK_SCOPE_A,
+            comparedResource.getTsFilePath(),
+            new SimpleProgressIndex(1, 2L)));
+    Assert.assertFalse(
+        keeper.isProgressIndexAfterOrEquals(
+            DATA_REGION_ID,
+            TASK_SCOPE_B,
+            comparedResource.getTsFilePath(),
+            new SimpleProgressIndex(1, 2L)));
+  }
+
+  @Test
+  public void testClearProgressIndexOnlyRemovesTargetTaskScope() throws 
IOException {
+    final TsFileResource scopeAResource = 
createTsFileResource("scope-a.tsfile", 1L);
+    final TsFileResource scopeBResource = 
createTsFileResource("scope-b.tsfile", 1L);
+
+    keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, scopeAResource);
+    keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_B, scopeBResource);
+
+    keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_A);
+
+    Assert.assertFalse(
+        keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A, 
scopeAResource.getTsFilePath()));
+    Assert.assertTrue(
+        keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_B, 
scopeBResource.getTsFilePath()));
+  }
+
+  private TsFileResource createTsFileResource(final String fileName, final 
long flushOrderId)
+      throws IOException {
+    final File file = new File(tempDir, fileName);
+    Assert.assertTrue(file.createNewFile());
+
+    final TsFileResource resource = new TsFileResource(file);
+    resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId));
+    return resource;
+  }
+}


Reply via email to