This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 44ed8578073 [To dev/1.3] Pipe: improve progress coverage checks 
(#17967)
44ed8578073 is described below

commit 44ed8578073028b16cc5706907a96a3df8eb1717
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 11:02:44 2026 +0800

    [To dev/1.3] Pipe: improve progress coverage checks (#17967)
    
    * Pipe: improve progress coverage checks (#17940)
    
    * Pipe: improve progress coverage checks
    
    * Pipe: address shutdown progress review comments
    
    * Pipe: refine Chinese shutdown progress messages
    
    (cherry picked from commit b33278688c2fc5f8c6f860253204b4e835f16487)
    
    * Fix Java 8 test compatibility
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 125 ++++++++++++++--
 .../PipeHistoricalDataRegionTsFileSource.java      |  88 ++++++------
 .../PipeTsFileEpochProgressIndexKeeper.java        |  80 +++++++++--
 .../iotdb/db/service/DataNodeShutdownHook.java     |   9 +-
 .../PipeHistoricalDataRegionTsFileSourceTest.java  | 123 ++++++++++++++++
 .../PipeTsFileEpochProgressIndexKeeperTest.java    | 160 +++++++++++++++++++++
 .../commons/consensus/index/ProgressIndex.java     |  11 ++
 7 files changed, 524 insertions(+), 72 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 67b9460c15a..09d3aef90c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -99,6 +99,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -626,25 +627,125 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent 
{
 
   ///////////////////////// Shutdown Logic /////////////////////////
 
+  public long getShutdownProgressPersistTimeoutInMs() {
+    return Math.max(
+        1_000L,
+        (long) 
CommonDescriptor.getInstance().getConfig().getCnConnectionTimeoutInMS()
+            + 
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS());
+  }
+
+  public boolean persistAllProgressIndex(final long timeoutInMs) {
+    final long normalizedTimeoutInMs = Math.max(1L, timeoutInMs);
+    final long startTime = System.currentTimeMillis();
+    final AtomicBoolean isConfirmed = new AtomicBoolean(false);
+    final Thread persistThread =
+        new Thread(
+            () -> isConfirmed.set(persistAllProgressIndexInternal()),
+            ThreadName.PIPE_RUNTIME_META_SYNCER.getName() + 
"-Shutdown-Persist");
+    persistThread.setDaemon(true);
+
+    LOGGER.info(
+        "Start to persist all pipe progress indexes during shutdown, pipe 
count {}, timeout {} ms.",
+        getPipeCount(),
+        normalizedTimeoutInMs);
+    persistThread.start();
+    try {
+      final long deadlineInMs = startTime + normalizedTimeoutInMs;
+      while (persistThread.isAlive()) {
+        final long remainingTimeInMs = deadlineInMs - 
System.currentTimeMillis();
+        if (remainingTimeInMs <= 0) {
+          break;
+        }
+        persistThread.join(remainingTimeInMs);
+      }
+    } catch (final InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.info("Interrupted while persisting all pipe progress indexes 
during shutdown.");
+      return false;
+    }
+
+    if (persistThread.isAlive()) {
+      LOGGER.warn(
+          "Timed out while persisting all pipe progress indexes during 
shutdown, cost {} ms.",
+          System.currentTimeMillis() - startTime);
+      return false;
+    }
+
+    if (!isConfirmed.get()) {
+      LOGGER.warn(
+          "Failed to persist all pipe progress indexes during shutdown, cost 
{} ms.",
+          System.currentTimeMillis() - startTime);
+    }
+    return isConfirmed.get();
+  }
+
   public void persistAllProgressIndex() {
-    try (final ConfigNodeClient configNodeClient =
-        
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
-      // Send request to some API server
+    persistAllProgressIndex(getShutdownProgressPersistTimeoutInMs());
+  }
+
+  private boolean persistAllProgressIndexInternal() {
+    final long collectStartTime = System.currentTimeMillis();
+    final int pipeCount = getPipeCount();
+    try {
       final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(new 
ArrayList<>());
       collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
+      final int pipeMetaCount = resp.getPipeMetaList().size();
+      final int pipeMetaSizeInBytes =
+          resp.getPipeMetaList().stream()
+              .filter(Objects::nonNull)
+              .mapToInt(ByteBuffer::remaining)
+              .sum();
+      LOGGER.info(
+          "Collected pipe metas for shutdown progress persist, pipe count {}, 
pipe meta count {}, pipe meta size {} bytes, cost {} ms.",
+          pipeCount,
+          pipeMetaCount,
+          pipeMetaSizeInBytes,
+          System.currentTimeMillis() - collectStartTime);
+
       if (resp.getPipeMetaList().isEmpty()) {
-        return;
+        if (pipeCount != 0) {
+          LOGGER.info(
+              "Collected empty pipe metas during shutdown while pipe count is 
{}.", pipeCount);
+          return false;
+        }
+        return true;
       }
-      final TSStatus result =
-          configNodeClient.pushHeartbeat(
-              IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
-        LOGGER.warn("Failed to persist progress index to configNode, status: 
{}", result);
-      } else {
-        LOGGER.info("Successfully persisted all pipe's info to configNode.");
+
+      try (final ConfigNodeClient configNodeClient =
+          
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
+        LOGGER.info(
+            "Start to push heartbeat shutdown pipe meta to ConfigNode, data 
node id {}, pipe count {}, pipe meta count {}, pipe meta size {} bytes.",
+            IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+            pipeCount,
+            pipeMetaCount,
+            pipeMetaSizeInBytes);
+        final long pushStartTime = System.currentTimeMillis();
+        final TSStatus result =
+            configNodeClient.pushHeartbeat(
+                IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), 
resp);
+        final long pushCostTime = System.currentTimeMillis() - pushStartTime;
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
+          LOGGER.warn("Failed to persist progress index to ConfigNode, status: 
{}", result);
+          LOGGER.warn(
+              "Failed to push heartbeat shutdown pipe meta to ConfigNode, 
status {}, cost {} ms.",
+              result,
+              pushCostTime);
+          return false;
+        } else {
+          LOGGER.info(
+              "Successfully finished pushing heartbeat shutdown pipe meta to 
ConfigNode, pipe count {}, pipe meta count {}, pipe meta size {} bytes, cost {} 
ms.",
+              pipeCount,
+              pipeMetaCount,
+              pipeMetaSizeInBytes,
+              pushCostTime);
+          LOGGER.info("Successfully persisted all pipe's info to ConfigNode.");
+          return true;
+        }
       }
     } catch (final Exception e) {
-      LOGGER.warn(e.getMessage());
+      LOGGER.warn(
+          "Exception occurred while persisting all pipe progress indexes 
during shutdown.", e);
+      return false;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index 54597cb1bcd..09b121bfae8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -372,53 +372,14 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
         final Collection<TsFileResource> sequenceTsFileResources =
             tsFileManager.getTsFileList(true).stream()
                 .peek(originalResourceList::add)
-                .filter(
-                    resource ->
-                        isHistoricalSourceEnabled
-                            &&
-                            // Some resource is marked as deleted but not 
removed from the list.
-                            !resource.isDeleted()
-                            // Some resource is generated by pipe. We ignore 
them if the pipe should
-                            // not transfer pipe requests.
-                            && (!resource.isGeneratedByPipe() || 
isForwardingPipeRequests)
-                            && (
-                            // If the tsFile is not already marked closing, it 
is not captured by
-                            // the pipe realtime module. Thus, we can wait for 
the realtime sync
-                            // module to handle this, to avoid blocking the 
pipe sync process.
-                            !resource.isClosed()
-                                    && 
Optional.ofNullable(resource.getProcessor())
-                                        
.map(TsFileProcessor::alreadyMarkedClosing)
-                                        .orElse(true)
-                                || mayTsFileContainUnprocessedData(resource)
-                                    && 
isTsFileResourceOverlappedWithTimeRange(resource)
-                                    && 
mayTsFileResourceOverlappedWithPattern(resource)))
+                .filter(this::shouldExtractTsFileResource)
                 .collect(Collectors.toList());
-        filteredTsFileResources.addAll(sequenceTsFileResources);
-
         final Collection<TsFileResource> unSequenceTsFileResources =
             tsFileManager.getTsFileList(false).stream()
                 .peek(originalResourceList::add)
-                .filter(
-                    resource ->
-                        isHistoricalSourceEnabled
-                            &&
-                            // Some resource is marked as deleted but not 
removed from the list.
-                            !resource.isDeleted()
-                            // Some resource is generated by pipe. We ignore 
them if the pipe should
-                            // not transfer pipe requests.
-                            && (!resource.isGeneratedByPipe() || 
isForwardingPipeRequests)
-                            && (
-                            // If the tsFile is not already marked closing, it 
is not captured by
-                            // the pipe realtime module. Thus, we can wait for 
the realtime sync
-                            // module to handle this, to avoid blocking the 
pipe sync process.
-                            !resource.isClosed()
-                                    && 
Optional.ofNullable(resource.getProcessor())
-                                        
.map(TsFileProcessor::alreadyMarkedClosing)
-                                        .orElse(true)
-                                || mayTsFileContainUnprocessedData(resource)
-                                    && 
isTsFileResourceOverlappedWithTimeRange(resource)
-                                    && 
mayTsFileResourceOverlappedWithPattern(resource)))
+                .filter(this::shouldExtractTsFileResource)
                 .collect(Collectors.toList());
+        filteredTsFileResources.addAll(sequenceTsFileResources);
         filteredTsFileResources.addAll(unSequenceTsFileResources);
 
         filteredTsFileResources.removeIf(
@@ -464,6 +425,46 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
     }
   }
 
+  private boolean shouldExtractTsFileResource(final TsFileResource resource) {
+    if (!isHistoricalSourceEnabled) {
+      return false;
+    }
+
+    // Some resource is marked as deleted but not removed from the list.
+    if (resource.isDeleted()) {
+      return false;
+    }
+
+    // Some resource is generated by pipe. We ignore them if the pipe should 
not transfer pipe
+    // requests.
+    if (resource.isGeneratedByPipe() && !isForwardingPipeRequests) {
+      return false;
+    }
+
+    // Some resource may not be closed due to the control of 
PIPE_MIN_FLUSH_INTERVAL_IN_MS. We
+    // simply ignore them.
+    if (!resource.isClosed()
+        && Optional.ofNullable(resource.getProcessor())
+            .map(TsFileProcessor::alreadyMarkedClosing)
+            .orElse(true)) {
+      return true;
+    }
+
+    if (!mayTsFileContainUnprocessedData(resource)) {
+      return false;
+    }
+
+    if (!isTsFileResourceOverlappedWithTimeRange(resource)) {
+      return false;
+    }
+
+    if (!mayTsFileResourceOverlappedWithPattern(resource)) {
+      return false;
+    }
+
+    return true;
+  }
+
   private boolean mayTsFileContainUnprocessedData(final TsFileResource 
resource) {
     if (startIndex instanceof TimeWindowStateProgressIndex) {
       // The resource is closed thus the TsFileResource#getFileEndTime() is 
safe to use
@@ -474,8 +475,7 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
       startIndex = ((StateProgressIndex) startIndex).getInnerProgressIndex();
     }
 
-    if (!startIndex.isAfter(resource.getMaxProgressIndex())
-        && !startIndex.equals(resource.getMaxProgressIndex())) {
+    if (!startIndex.isEqualOrAfter(resource.getMaxProgressIndex())) {
       LOGGER.info(
           "Pipe {}@{}: file {} meets mayTsFileContainUnprocessedData 
condition, extractor progressIndex: {}, resource ProgressIndex: {}",
           pipeName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index ff7d90c377d..e1ebe178dd0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -31,40 +31,90 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class PipeTsFileEpochProgressIndexKeeper {
 
-  // data region id -> pipeName -> tsFile path -> max progress index
+  // data region id -> task scope id -> tsFile path -> max progress index
   private final Map<String, Map<String, Map<String, TsFileResource>>> 
progressIndexKeeper =
       new ConcurrentHashMap<>();
 
   public synchronized void registerProgressIndex(
-      final String dataRegionId, final String pipeName, final TsFileResource 
resource) {
+      final String dataRegionId, final String taskScopeID, final 
TsFileResource resource) {
     progressIndexKeeper
         .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
+        .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>())
         .putIfAbsent(resource.getTsFilePath(), resource);
   }
 
   public synchronized void eliminateProgressIndex(
-      final String dataRegionId, final @Nonnull String pipeName, final String 
filePath) {
-    progressIndexKeeper
-        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
-        .remove(filePath);
+      final String dataRegionId, final @Nonnull String taskScopeID, final 
String filePath) {
+    final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+        progressIndexKeeper.get(dataRegionId);
+    if (scopeProgressIndexKeeper == null) {
+      return;
+    }
+
+    final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+        scopeProgressIndexKeeper.get(taskScopeID);
+    if (tsFileProgressIndexKeeper == null) {
+      return;
+    }
+
+    tsFileProgressIndexKeeper.remove(filePath);
+    if (tsFileProgressIndexKeeper.isEmpty()) {
+      scopeProgressIndexKeeper.remove(taskScopeID);
+      if (scopeProgressIndexKeeper.isEmpty()) {
+        progressIndexKeeper.remove(dataRegionId);
+      }
+    }
+  }
+
+  public synchronized void clearProgressIndex(
+      final String dataRegionId, final @Nonnull String taskScopeID) {
+    final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+        progressIndexKeeper.get(dataRegionId);
+    if (scopeProgressIndexKeeper == null) {
+      return;
+    }
+
+    scopeProgressIndexKeeper.remove(taskScopeID);
+    if (scopeProgressIndexKeeper.isEmpty()) {
+      progressIndexKeeper.remove(dataRegionId);
+    }
+  }
+
+  public synchronized boolean containsTsFile(
+      final String dataRegionId, final @Nonnull String taskScopeID, final 
String tsFilePath) {
+    final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+        progressIndexKeeper.get(dataRegionId);
+    if (scopeProgressIndexKeeper == null) {
+      return false;
+    }
+
+    final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+        scopeProgressIndexKeeper.get(taskScopeID);
+    return tsFileProgressIndexKeeper != null && 
tsFileProgressIndexKeeper.containsKey(tsFilePath);
   }
 
   public synchronized boolean isProgressIndexAfterOrEquals(
       final String dataRegionId,
-      final String pipeName,
+      final String taskScopeID,
       final String tsFilePath,
       final ProgressIndex progressIndex) {
-    return progressIndexKeeper
-        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
-        .entrySet()
-        .stream()
+    final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+        progressIndexKeeper.get(dataRegionId);
+    if (scopeProgressIndexKeeper == null) {
+      return false;
+    }
+
+    final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+        scopeProgressIndexKeeper.get(taskScopeID);
+    if (tsFileProgressIndexKeeper == null) {
+      return false;
+    }
+
+    return tsFileProgressIndexKeeper.entrySet().stream()
         .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
         .map(Entry::getValue)
         .filter(Objects::nonNull)
-        .anyMatch(resource -> 
!resource.getMaxProgressIndex().isAfter(progressIndex));
+        .anyMatch(resource -> 
progressIndex.isEqualOrAfter(resource.getMaxProgressIndex()));
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 4d0f4a5a412..76966820abb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -148,7 +148,14 @@ public class DataNodeShutdownHook extends Thread {
       }
     }
     // Persist progress index before shutdown to accurate recovery after 
restart
-    PipeDataNodeAgent.task().persistAllProgressIndex();
+    final long shutdownProgressPersistTimeoutInMs =
+        PipeDataNodeAgent.task().getShutdownProgressPersistTimeoutInMs();
+    logger.info(
+        "Persisting pipe progress indexes before shutdown, timeout {} ms.",
+        shutdownProgressPersistTimeoutInMs);
+    if 
(!PipeDataNodeAgent.task().persistAllProgressIndex(shutdownProgressPersistTimeoutInMs))
 {
+      logger.warn("Pipe progress indexes were not confirmed during shutdown.");
+    }
 
     // Actually stop all services started by the DataNode.
     // If we don't call this, services like the RestService are not stopped 
and I can't re-start
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSourceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSourceTest.java
new file mode 100644
index 00000000000..3b1396de5c1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSourceTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.historical;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+
+public class PipeHistoricalDataRegionTsFileSourceTest {
+
+  @Test
+  public void testMayTsFileContainUnprocessedDataUsesEqualOrAfterCoverage() 
throws Exception {
+    final File tempDir = 
Files.createTempDirectory("pipeHistoricalProgressCoverage").toFile();
+
+    try {
+      assertMayTsFileContainUnprocessedData(
+          tempDir,
+          "superset.tsfile",
+          hybridProgressIndex(
+              new IoTProgressIndex(ImmutableMap.of(1, 100L, 2, 200L)),
+              new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+          hybridProgressIndex(
+              new IoTProgressIndex(1, 100L),
+              new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 9))),
+          false);
+
+      assertMayTsFileContainUnprocessedData(
+          tempDir,
+          "missing-dimension.tsfile",
+          hybridProgressIndex(new IoTProgressIndex(1, 100L)),
+          hybridProgressIndex(
+              new IoTProgressIndex(1, 90L),
+              new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+          true);
+    } finally {
+      FileUtils.deleteFileOrDirectory(tempDir);
+    }
+  }
+
+  private static void assertMayTsFileContainUnprocessedData(
+      final File tempDir,
+      final String fileName,
+      final ProgressIndex startIndex,
+      final ProgressIndex resourceProgressIndex,
+      final boolean expected)
+      throws Exception {
+    Assert.assertEquals(!expected, 
startIndex.isEqualOrAfter(resourceProgressIndex));
+
+    final PipeHistoricalDataRegionTsFileSource source = new 
PipeHistoricalDataRegionTsFileSource();
+    setPrivateField(source, "pipeName", "pipe");
+    setPrivateField(source, "dataRegionId", 1);
+    setPrivateField(source, "startIndex", startIndex);
+
+    final Method method =
+        PipeHistoricalDataRegionTsFileSource.class.getDeclaredMethod(
+            "mayTsFileContainUnprocessedData", TsFileResource.class);
+    method.setAccessible(true);
+    Assert.assertEquals(
+        expected,
+        method.invoke(
+            source, createClosedTsFileResource(tempDir, fileName, 
resourceProgressIndex)));
+  }
+
+  private static TsFileResource createClosedTsFileResource(
+      final File tempDir, final String fileName, final ProgressIndex 
progressIndex)
+      throws Exception {
+    final File file = new File(tempDir, fileName);
+    Assert.assertTrue(file.createNewFile());
+
+    final TsFileResource resource = new TsFileResource(file);
+    resource.setStatusForTest(TsFileResourceStatus.NORMAL);
+    resource.updateProgressIndex(progressIndex);
+    return resource;
+  }
+
+  private static ProgressIndex hybridProgressIndex(
+      final ProgressIndex firstProgressIndex, final ProgressIndex... 
progressIndexes) {
+    ProgressIndex result = new HybridProgressIndex(firstProgressIndex);
+    for (final ProgressIndex progressIndex : progressIndexes) {
+      result = 
result.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex);
+    }
+    return result;
+  }
+
+  private static void setPrivateField(
+      final PipeHistoricalDataRegionTsFileSource source, final String 
fieldName, final Object value)
+      throws ReflectiveOperationException {
+    final Field field = 
PipeHistoricalDataRegionTsFileSource.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(source, value);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
new file mode 100644
index 00000000000..bfd45b708b6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+public class PipeTsFileEpochProgressIndexKeeperTest {
+
+  private static final String DATA_REGION_ID = "1";
+  private static final String TASK_SCOPE_A = "task-scope-a";
+  private static final String TASK_SCOPE_B = "task-scope-b";
+
+  private final PipeTsFileEpochProgressIndexKeeper keeper =
+      PipeTsFileEpochProgressIndexKeeper.getInstance();
+
+  private File tempDir;
+
+  @Before
+  public void setUp() throws IOException {
+    tempDir = 
Files.createTempDirectory("pipeTsFileEpochProgressIndexKeeper").toFile();
+  }
+
+  @After
+  public void tearDown() {
+    keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_A);
+    keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_B);
+    FileUtils.deleteFileOrDirectory(tempDir);
+  }
+
+  @Test
+  public void testDuplicateTsFileLookupIsScopedByTaskInstance() throws 
IOException {
+    final TsFileResource resource = createTsFileResource("shared.tsfile", 1L);
+
+    keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, resource);
+
+    Assert.assertTrue(
+        keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A, 
resource.getTsFilePath()));
+    Assert.assertFalse(
+        keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_B, 
resource.getTsFilePath()));
+  }
+
+  @Test
+  public void testProgressIndexCheckDoesNotLeakAcrossTaskScopes() throws 
IOException {
+    keeper.registerProgressIndex(
+        DATA_REGION_ID, TASK_SCOPE_A, createTsFileResource("1-1-0-0.tsfile", 
1L));
+
+    final TsFileResource comparedResource = 
createTsFileResource("1-2-0-0.tsfile", 2L);
+    keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, 
comparedResource);
+
+    Assert.assertTrue(
+        keeper.isProgressIndexAfterOrEquals(
+            DATA_REGION_ID,
+            TASK_SCOPE_A,
+            comparedResource.getTsFilePath(),
+            new SimpleProgressIndex(1, 2L)));
+    Assert.assertFalse(
+        keeper.isProgressIndexAfterOrEquals(
+            DATA_REGION_ID,
+            TASK_SCOPE_B,
+            comparedResource.getTsFilePath(),
+            new SimpleProgressIndex(1, 2L)));
+  }
+
+  @Test
+  public void testProgressIndexCheckUsesEqualOrAfterCoverage() throws 
IOException {
+    final ProgressIndex registeredProgressIndex =
+        hybridProgressIndex(
+            new IoTProgressIndex(1, 90L),
+            new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10)));
+    keeper.registerProgressIndex(
+        DATA_REGION_ID,
+        TASK_SCOPE_A,
+        createTsFileResource("registered-hybrid.tsfile", 
registeredProgressIndex));
+
+    Assert.assertFalse(
+        keeper.isProgressIndexAfterOrEquals(
+            DATA_REGION_ID, TASK_SCOPE_A, "current.tsfile", new 
IoTProgressIndex(1, 100L)));
+
+    Assert.assertTrue(
+        keeper.isProgressIndexAfterOrEquals(
+            DATA_REGION_ID,
+            TASK_SCOPE_A,
+            "current.tsfile",
+            hybridProgressIndex(
+                new IoTProgressIndex(1, 100L),
+                new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 
10)))));
+  }
+
+  @Test
+  public void testClearProgressIndexOnlyRemovesTargetTaskScope() throws 
IOException {
+    final TsFileResource scopeAResource = 
createTsFileResource("scope-a.tsfile", 1L);
+    final TsFileResource scopeBResource = 
createTsFileResource("scope-b.tsfile", 1L);
+
+    keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, scopeAResource);
+    keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_B, scopeBResource);
+
+    keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_A);
+
+    Assert.assertFalse(
+        keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A, 
scopeAResource.getTsFilePath()));
+    Assert.assertTrue(
+        keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_B, 
scopeBResource.getTsFilePath()));
+  }
+
+  private TsFileResource createTsFileResource(final String fileName, final 
long flushOrderId)
+      throws IOException {
+    return createTsFileResource(fileName, new SimpleProgressIndex(1, 
flushOrderId));
+  }
+
+  private TsFileResource createTsFileResource(
+      final String fileName, final ProgressIndex progressIndex) throws 
IOException {
+    final File file = new File(tempDir, fileName);
+    Assert.assertTrue(file.createNewFile());
+
+    final TsFileResource resource = new TsFileResource(file);
+    resource.updateProgressIndex(progressIndex);
+    return resource;
+  }
+
+  private ProgressIndex hybridProgressIndex(
+      final ProgressIndex firstProgressIndex, final ProgressIndex... 
progressIndexes) {
+    ProgressIndex result = new HybridProgressIndex(firstProgressIndex);
+    for (final ProgressIndex progressIndex : progressIndexes) {
+      result = 
result.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex);
+    }
+    return result;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index 3c8d13bab54..979eee0c8db 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -116,6 +116,17 @@ public abstract class ProgressIndex implements Accountable 
{
     return super.hashCode();
   }
 
+  /**
+   * A.isEqualOrAfter(B) is true if and only if A already covers B in every 
tuple member. In other
+   * words, blending B into A does not advance A.
+   *
+   * @param progressIndex the progress index to be compared
+   * @return true if and only if this progress index is equal to or after the 
given progress index
+   */
+  public final boolean isEqualOrAfter(@Nonnull final ProgressIndex 
progressIndex) {
+    return 
updateToMinimumEqualOrIsAfterProgressIndex(progressIndex).equals(this);
+  }
+
   /**
    * Define the isEqualOrAfter relation, A.isEqualOrAfter(B) if and only if 
each tuple member in A
    * is greater than or equal to B in the corresponding total order relation.

Reply via email to