This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 961127fbc5c [To dev/1.3] Subscription: intro poll and prefetch v2 for 
tsfile topic (#15790) (#15865)
961127fbc5c is described below

commit 961127fbc5c038b23ceb6407645c776fa47f3f50
Author: VGalaxies <[email protected]>
AuthorDate: Fri Jul 4 10:07:30 2025 +0800

    [To dev/1.3] Subscription: intro poll and prefetch v2 for tsfile topic 
(#15790) (#15865)
    
    * Subscription: intro poll and prefetch v2 for tsfile topic (#15790)
    
    * fixup! Subscription: intro poll and prefetch v2 for tsfile topic (#15790)
---
 .../it/env/cluster/config/MppCommonConfig.java     |  17 +
 .../env/cluster/config/MppSharedCommonConfig.java  |  19 +
 .../it/env/remote/config/RemoteCommonConfig.java   |  11 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   6 +
 .../it/triple/AbstractSubscriptionTripleIT.java    |   7 +
 .../AbstractSubscriptionRegressionIT.java          |  71 +++-
 .../IoTDBRootPullConsumeTsfileIT.java              |  15 +-
 .../format/IoTDBDBTsfilePullConsumerIT.java        |  18 +-
 .../IoTDBAllTsTsfilePullConsumerIT.java            |  16 +-
 .../IoTDBAllTsfilePullConsumerSnapshotIT.java      |  14 +-
 .../IoTDBPathDeviceTsfilePullConsumerIT.java       |  17 +-
 .../IoTDBTimeTsTsfilePullConsumerIT.java           |  17 +-
 .../IoTDBSnapshotDevicePullConsumerTsfileIT.java   |  16 +-
 .../multi/IoTDBOneConsumerMultiTopicsMixIT.java    |  33 +-
 .../multi/IoTDBOneConsumerMultiTopicsTsfileIT.java |  39 +--
 .../pattern/IoTDBDBPatternPullConsumeTsfileIT.java |  15 +-
 .../IoTDBDevicePatternPullConsumeTsfileIT.java     |  14 +-
 .../IoTDBRootPatternPullConsumeTsfileIT.java       |  12 +-
 .../pattern/IoTDBTSPatternPullConsumeTsfileIT.java |  15 +-
 .../multi/IoTDBOneConsumerMultiTopicsMixIT.java    |   8 +
 .../multi/IoTDBOneConsumerMultiTopicsTsfileIT.java |  11 +
 .../topic/IoTDBDataSet1TopicConsumerSpecialIT.java |   4 +-
 .../db/subscription/broker/SubscriptionBroker.java |  35 +-
 .../broker/SubscriptionPrefetchingQueue.java       | 387 ++++++++++++++++++++-
 .../broker/SubscriptionPrefetchingTabletQueue.java |   7 +-
 .../broker/SubscriptionPrefetchingTsFileQueue.java |   6 -
 .../SubscriptionPipeEventBatchSegmentLock.java     |  75 ++++
 .../event/batch/SubscriptionPipeEventBatches.java  | 118 ++++---
 .../batch/SubscriptionPipeTsFileEventBatch.java    |  40 +--
 .../receiver/SubscriptionReceiverV1.java           |   2 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |   4 +-
 31 files changed, 785 insertions(+), 284 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index fe4190d7708..47f96590964 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -503,6 +503,23 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
   }
 
   @Override
+  public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+      int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
+    setProperty(
+        "subscription_prefetch_ts_file_batch_max_delay_in_ms",
+        String.valueOf(subscriptionPrefetchTsFileBatchMaxDelayInMs));
+    return this;
+  }
+
+  @Override
+  public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+      int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
+    setProperty(
+        "subscription_prefetch_ts_file_batch_max_size_in_bytes",
+        String.valueOf(subscriptionPrefetchTsFileBatchMaxSizeInBytes));
+    return this;
+  }
+
   public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
     setProperty("subscription_enabled", String.valueOf(subscriptionEnabled));
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index c2ade6eace0..151e13f1290 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -514,6 +514,25 @@ public class MppSharedCommonConfig implements CommonConfig 
{
   }
 
   @Override
+  public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+      int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
+    dnConfig.setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+        subscriptionPrefetchTsFileBatchMaxDelayInMs);
+    cnConfig.setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+        subscriptionPrefetchTsFileBatchMaxDelayInMs);
+    return this;
+  }
+
+  @Override
+  public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+      int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
+    dnConfig.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+        subscriptionPrefetchTsFileBatchMaxSizeInBytes);
+    cnConfig.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+        subscriptionPrefetchTsFileBatchMaxSizeInBytes);
+    return this;
+  }
+
   public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
     dnConfig.setSubscriptionEnabled(subscriptionEnabled);
     cnConfig.setSubscriptionEnabled(subscriptionEnabled);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 581c5f4345e..9e82aaf9ebf 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -362,6 +362,17 @@ public class RemoteCommonConfig implements CommonConfig {
   }
 
   @Override
+  public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+      int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
+    return this;
+  }
+
+  @Override
+  public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+      int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
+    return this;
+  }
+
   public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
     return this;
   }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 16d50954209..639c6142783 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -160,6 +160,12 @@ public interface CommonConfig {
 
   CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
 
+  CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+      int subscriptionPrefetchTsFileBatchMaxDelayInMs);
+
+  CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+      int subscriptionPrefetchTsFileBatchMaxSizeInBytes);
+
   CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled);
 
   default CommonConfig setDefaultStorageGroupLevel(int 
defaultStorageGroupLevel) {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
index 3b49eb8ed13..ee4a33ca2ed 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
@@ -69,6 +69,13 @@ public abstract class AbstractSubscriptionTripleIT extends 
AbstractSubscriptionI
     sender.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
     receiver1.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
     receiver2.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+
+    // reduce tsfile batch memory usage
+    
sender.getConfig().getCommonConfig().setSubscriptionPrefetchTsFileBatchMaxDelayInMs(500);
+    sender
+        .getConfig()
+        .getCommonConfig()
+        .setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(32 * 1024);
   }
 
   @Override
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
index 0eb5798599d..125195b5a95 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
@@ -57,6 +57,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
 import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS;
@@ -305,11 +307,6 @@ public abstract class AbstractSubscriptionRegressionIT 
extends AbstractSubscript
     }
   }
 
-  public List<Integer> consume_tsfile_withFileCount(
-      SubscriptionPullConsumer consumer, String device) throws 
InterruptedException {
-    return consume_tsfile(consumer, Collections.singletonList(device));
-  }
-
   public int consume_tsfile(SubscriptionPullConsumer consumer, String device)
       throws InterruptedException {
     return consume_tsfile(consumer, Collections.singletonList(device)).get(0);
@@ -361,15 +358,6 @@ public abstract class AbstractSubscriptionRegressionIT 
extends AbstractSubscript
     return results;
   }
 
-  public void consume_data(SubscriptionPullConsumer consumer)
-      throws TException,
-          IOException,
-          StatementExecutionException,
-          InterruptedException,
-          IoTDBConnectionException {
-    consume_data(consumer, session_dest);
-  }
-
   public void consume_data_await(
       SubscriptionPullConsumer consumer, Session session, 
List<WrappedVoidSupplier> assertions) {
     AWAIT.untilAsserted(
@@ -393,11 +381,64 @@ public abstract class AbstractSubscriptionRegressionIT 
extends AbstractSubscript
   }
 
   public void consume_tsfile_await(
+      SubscriptionPullConsumer consumer,
+      List<String> devices,
+      List<Integer> expected,
+      List<Boolean> allowGte) {
+    final List<AtomicInteger> counters = new ArrayList<>(devices.size());
+    for (int i = 0; i < devices.size(); i++) {
+      counters.add(new AtomicInteger(0));
+    }
+    AWAIT.untilAsserted(
+        () -> {
+          List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+          if (messages.isEmpty()) {
+            session_src.executeNonQueryStatement("flush");
+          }
+          for (final SubscriptionMessage message : messages) {
+            final SubscriptionTsFileHandler tsFileHandler = 
message.getTsFileHandler();
+            try (final TsFileReader tsFileReader = tsFileHandler.openReader()) 
{
+              for (int i = 0; i < devices.size(); i++) {
+                final Path path = new Path(devices.get(i), "s_0", true);
+                final QueryDataSet dataSet =
+                    tsFileReader.query(
+                        
QueryExpression.create(Collections.singletonList(path), null));
+                while (dataSet.hasNext()) {
+                  dataSet.next();
+                  counters.get(i).addAndGet(1);
+                }
+              }
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+          consumer.commitSync(messages);
+          for (int i = 0; i < devices.size(); i++) {
+            if (allowGte.get(i)) {
+              assertGte(counters.get(i).get(), expected.get(i));
+            } else {
+              assertEquals(counters.get(i).get(), expected.get(i));
+            }
+          }
+        });
+  }
+
+  public void consume_tsfile_await(
+      SubscriptionPullConsumer consumer, List<String> devices, List<Integer> 
expected) {
+    consume_tsfile_await(
+        consumer,
+        devices,
+        expected,
+        Stream.generate(() -> 
false).limit(devices.size()).collect(Collectors.toList()));
+  }
+
+  public void consume_tsfile_with_file_count_await(
       SubscriptionPullConsumer consumer, List<String> devices, List<Integer> 
expected) {
     final List<AtomicInteger> counters = new ArrayList<>(devices.size());
     for (int i = 0; i < devices.size(); i++) {
       counters.add(new AtomicInteger(0));
     }
+    AtomicInteger onReceived = new AtomicInteger(0);
     AWAIT.untilAsserted(
         () -> {
           List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
@@ -405,6 +446,7 @@ public abstract class AbstractSubscriptionRegressionIT 
extends AbstractSubscript
             session_src.executeNonQueryStatement("flush");
           }
           for (final SubscriptionMessage message : messages) {
+            onReceived.incrementAndGet();
             final SubscriptionTsFileHandler tsFileHandler = 
message.getTsFileHandler();
             try (final TsFileReader tsFileReader = tsFileHandler.openReader()) 
{
               for (int i = 0; i < devices.size(); i++) {
@@ -425,6 +467,7 @@ public abstract class AbstractSubscriptionRegressionIT 
extends AbstractSubscript
           for (int i = 0; i < devices.size(); i++) {
             assertEquals(counters.get(i).get(), expected.get(i));
           }
+          assertEquals(onReceived.get(), expected.get(devices.size()));
         });
   }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
index 6106a392c66..169cbd7305f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
@@ -39,6 +39,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /***
@@ -134,23 +135,13 @@ public class IoTDBRootPullConsumeTsfileIT extends 
AbstractSubscriptionRegression
     List<String> devices = new ArrayList<>(2);
     devices.add(device);
     devices.add(device2);
-    List<Integer> results = consume_tsfile(consumer, devices);
-    assertEquals(results.get(0), 10);
-    assertEquals(results.get(1), 10);
+    consume_tsfile_await(consumer, devices, Arrays.asList(10, 10));
     consumer.unsubscribe(topicName);
     assertEquals(subs.getSubscriptions(topicName).size(), 0, "unsubscribe:show 
subscriptions");
     consumer.subscribe(topicName);
     assertEquals(subs.getSubscriptions().size(), 1, "subscribe again:show 
subscriptions");
     insert_data(1707782400000L, device); // 2024-02-13 08:00:00+08:00
     insert_data(1707782400000L, device2); // 2024-02-13 08:00:00+08:00
-    results = consume_tsfile(consumer, devices);
-    assertEquals(
-        results.get(0),
-        15,
-        "Unsubscribing and then re-subscribing will not retain progress. Full 
synchronization.");
-    assertEquals(
-        results.get(1),
-        15,
-        "Unsubscribing and then re-subscribing will not retain progress. Full 
synchronization.");
+    consume_tsfile_await(consumer, devices, Arrays.asList(15, 15));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/format/IoTDBDBTsfilePullConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/format/IoTDBDBTsfilePullConsumerIT.java
index aa5b519ca63..a034a408062 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/format/IoTDBDBTsfilePullConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/format/IoTDBDBTsfilePullConsumerIT.java
@@ -40,6 +40,8 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /***
@@ -129,9 +131,8 @@ public class IoTDBDBTsfilePullConsumerIT extends 
AbstractSubscriptionRegressionI
     //        insert_data(1706659200000L); //2024-01-31 08:00:00+08:00
     insert_data(System.currentTimeMillis());
     // Consumption data
-    List<Integer> results = consume_tsfile_withFileCount(consumer, device);
-    assertEquals(results.get(0), 10);
-    assertEquals(results.get(1), 2, "number of received files");
+    consume_tsfile_with_file_count_await(
+        consumer, Collections.singletonList(device), Arrays.asList(10, 2));
     // Unsubscribe
     consumer.unsubscribe(topicName);
     assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after 
unsubscription");
@@ -141,14 +142,7 @@ public class IoTDBDBTsfilePullConsumerIT extends 
AbstractSubscriptionRegressionI
     insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
     // Consumption data: Progress is not retained when re-subscribing after 
cancellation. Full
     // synchronization.
-    results = consume_tsfile_withFileCount(consumer, device);
-    assertEquals(
-        results.get(0),
-        15,
-        "Unsubscribe and resubscribe, progress is not retained. Full 
synchronization.");
-    assertEquals(
-        results.get(1),
-        3,
-        "Number of received files: After unsubscribing and resubscribing, 
progress is not retained. Full synchronization.");
+    consume_tsfile_with_file_count_await(
+        consumer, Collections.singletonList(device), Arrays.asList(15, 3));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsTsfilePullConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsTsfilePullConsumerIT.java
index 4d11e63006d..1e70c9a9ac9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsTsfilePullConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsTsfilePullConsumerIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
@@ -170,10 +171,8 @@ public class IoTDBAllTsTsfilePullConsumerIT extends 
AbstractSubscriptionRegressi
     paths.add(device);
     paths.add(device2);
     paths.add(database2 + ".d_2");
-    List<Integer> rowCountList = consume_tsfile(consumer, paths);
-    assertGte(rowCountList.get(0), 8);
-    assertEquals(rowCountList.get(1), 0);
-    assertEquals(rowCountList.get(2), 0);
+    consume_tsfile_await(
+        consumer, paths, Arrays.asList(8, 0, 0), Arrays.asList(true, false, 
false));
 
     // Unsubscribe
     consumer.unsubscribe(topicName);
@@ -187,12 +186,7 @@ public class IoTDBAllTsTsfilePullConsumerIT extends 
AbstractSubscriptionRegressi
 
     // Consumption data: Progress is not retained after unsubscribing and 
resubscribing. Full
     // synchronization.
-    rowCountList = consume_tsfile(consumer, paths);
-    assertGte(
-        rowCountList.get(0),
-        13,
-        "Unsubscribe and then resubscribe, progress is not retained. Full 
synchronization.");
-    assertEquals(rowCountList.get(1), 0, "Unsubscribe and then resubscribe," + 
device2);
-    assertEquals(rowCountList.get(2), 0, "Unsubscribe and then resubscribe," + 
database2 + ".d_2");
+    consume_tsfile_await(
+        consumer, paths, Arrays.asList(13, 0, 0), Arrays.asList(true, false, 
false));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsfilePullConsumerSnapshotIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsfilePullConsumerSnapshotIT.java
index e30ea3a5dba..03a4956ce45 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsfilePullConsumerSnapshotIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsfilePullConsumerSnapshotIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
@@ -171,11 +172,8 @@ public class IoTDBAllTsfilePullConsumerSnapshotIT extends 
AbstractSubscriptionRe
     paths.add(device);
     paths.add(device2);
     paths.add(database2 + ".d_2");
-    List<Integer> rowCountList = consume_tsfile(consumer, paths);
     // Subscribe and write without consuming
-    assertEquals(rowCountList.get(0), 5, "Write without consume after 
subscription");
-    assertEquals(rowCountList.get(1), 0);
-    assertEquals(rowCountList.get(2), 0);
+    consume_tsfile_await(consumer, paths, Arrays.asList(5, 0, 0));
 
     // Unsubscribe
     consumer.unsubscribe(topicName);
@@ -189,12 +187,6 @@ public class IoTDBAllTsfilePullConsumerSnapshotIT extends 
AbstractSubscriptionRe
 
     // Consumption data: Progress is not retained after unsubscribing and 
re-subscribing. Full
     // synchronization.
-    rowCountList = consume_tsfile(consumer, paths);
-    assertEquals(
-        rowCountList.get(0),
-        10,
-        "Unsubscribe and then resubscribe, progress is not retained. Full 
synchronization.");
-    assertEquals(rowCountList.get(1), 0, "Unsubscribe and then resubscribe," + 
device2);
-    assertEquals(rowCountList.get(2), 0, "Unsubscribe and then resubscribe," + 
database2 + ".d_2");
+    consume_tsfile_await(consumer, paths, Arrays.asList(10, 0, 0));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceTsfilePullConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceTsfilePullConsumerIT.java
index 4f94a42abf0..49dae7dd726 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceTsfilePullConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceTsfilePullConsumerIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /***
@@ -160,12 +161,7 @@ public class IoTDBPathDeviceTsfilePullConsumerIT extends 
AbstractSubscriptionReg
     devices.add(device);
     devices.add(device2);
     devices.add(database2 + ".d_2");
-    List<Integer> results = consume_tsfile(consumer, devices);
-
-    assertEquals(results.get(0), 10);
-    assertEquals(results.get(1), 0);
-    assertEquals(results.get(2), 0);
-    assertEquals(results.get(3), 2, "number of received files");
+    consume_tsfile_with_file_count_await(consumer, devices, Arrays.asList(10, 
0, 0, 2));
     // Unsubscribe
     consumer.unsubscribe(topicName);
     assertEquals(subs.getSubscriptions().size(), 0, "show subscriptions after 
cancellation");
@@ -176,13 +172,6 @@ public class IoTDBPathDeviceTsfilePullConsumerIT extends 
AbstractSubscriptionReg
     insert_data(1707782400000L, device2); // 2024-02-13 08:00:00+08:00
     // Consumption data: Progress is not retained after unsubscribing and 
re-subscribing. Full
     // synchronization.
-    results = consume_tsfile(consumer, devices);
-    assertEquals(
-        results.get(0),
-        15,
-        "Unsubscribe and resubscribe, progress is not retained. Full 
synchronization.");
-    assertEquals(results.get(1), 0, "Unsubscribe and then subscribe again," + 
device2);
-    assertEquals(results.get(2), 0, "Subscribe again after cancellation," + 
database2 + ".d_2");
-    assertEquals(results.get(3), 3, "Number of received files: resubscribe 
after unsubscribe");
+    consume_tsfile_with_file_count_await(consumer, devices, Arrays.asList(15, 
0, 0, 3));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsTsfilePullConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsTsfilePullConsumerIT.java
index 5c6c7c363bd..404ac75f99e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsTsfilePullConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsTsfilePullConsumerIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /***
@@ -170,10 +171,8 @@ public class IoTDBTimeTsTsfilePullConsumerIT extends 
AbstractSubscriptionRegress
     paths.add(device2);
     paths.add(device3);
 
-    rowCountList = consume_tsfile(consumer, paths);
-    assertGte(rowCountList.get(0), 8, device);
-    assertEquals(rowCountList.get(1), 0, device2);
-    assertEquals(rowCountList.get(2), 0, device3);
+    consume_tsfile_await(
+        consumer, paths, Arrays.asList(8, 0, 0), Arrays.asList(true, false, 
false));
 
     // Unsubscribe
     consumer.unsubscribe(topicName);
@@ -190,13 +189,7 @@ public class IoTDBTimeTsTsfilePullConsumerIT extends 
AbstractSubscriptionRegress
     // synchronization.
     System.out.println("TimeTsTsfilePullConsumer src2 filter:" + 
getCount(session_src, sql));
 
-    rowCountList = consume_tsfile(consumer, paths);
-    assertGte(
-        rowCountList.get(0),
-        13,
-        "Unsubscribe and then resubscribe, progress is not retained. Full 
synchronization. Actual="
-            + rowCountList.get(0));
-    assertEquals(rowCountList.get(1), 0, "Unsubscribe and subscribe again," + 
device2);
-    assertEquals(rowCountList.get(2), 0, "Unsubscribe and then resubscribe," + 
device3);
+    consume_tsfile_await(
+        consumer, paths, Arrays.asList(13, 0, 0), Arrays.asList(true, false, 
false));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerTsfileIT.java
index 2fe3f482764..386ab059545 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerTsfileIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /***
@@ -153,11 +154,7 @@ public class IoTDBSnapshotDevicePullConsumerTsfileIT 
extends AbstractSubscriptio
     devices.add(device);
     devices.add(database + ".d_1");
     devices.add(database2 + ".d_2");
-    List<Integer> rowCounts = consume_tsfile(consumer, devices);
-
-    assertEquals(rowCounts.get(0), 5);
-    assertEquals(rowCounts.get(1), 0);
-    assertEquals(rowCounts.get(2), 0);
+    consume_tsfile_await(consumer, devices, Arrays.asList(5, 0, 0));
 
     // Unsubscribe
     consumer.unsubscribe(topicName);
@@ -169,13 +166,6 @@ public class IoTDBSnapshotDevicePullConsumerTsfileIT 
extends AbstractSubscriptio
 
     // Consumption data: Progress is not retained after unsubscribing and then 
re-subscribing. Full
     // synchronization.
-    rowCounts = consume_tsfile(consumer, devices);
-    assertEquals(
-        rowCounts.get(0),
-        10,
-        "Unsubscribe and resubscribe, progress is not retained. Full 
synchronization.");
-    assertEquals(rowCounts.get(1), 0, "Unsubscribe and resubscribe," + 
database + ".d_1");
-    assertEquals(
-        rowCounts.get(2), 0, "Cancel subscription and subscribe again," + 
database2 + ".d_2");
+    consume_tsfile_await(consumer, devices, Arrays.asList(10, 0, 0));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
index bd93bb2adb5..2155d09ca83 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
@@ -26,6 +26,9 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+import org.apache.iotdb.subscription.it.Retry;
+import org.apache.iotdb.subscription.it.RetryRule;
 import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
 
 import org.apache.thrift.TException;
@@ -41,6 +44,7 @@ import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -52,6 +56,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
@@ -62,6 +67,9 @@ import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2SubscriptionRegressionConsumer.class})
 public class IoTDBOneConsumerMultiTopicsMixIT extends 
AbstractSubscriptionRegressionIT {
+
+  @Rule public RetryRule retryRule = new RetryRule();
+
   private static final String database = "root.test.OneConsumerMultiTopicsMix";
   private static final String device = database + ".d_0";
   private String pattern = device + ".s_0";
@@ -88,6 +96,15 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends 
AbstractSubscriptionRegres
     assertTrue(subs.getTopic(topicName2).isPresent(), "Create show topics 2");
   }
 
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
@@ -98,6 +115,7 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends 
AbstractSubscriptionRegres
     subs.dropTopic(topicName);
     subs.dropTopic(topicName2);
     dropDB(database);
+    schemaList.clear();
     super.tearDown();
   }
 
@@ -116,6 +134,7 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends 
AbstractSubscriptionRegres
     session_src.executeNonQueryStatement("flush;");
   }
 
+  @Retry
   @Test
   public void do_test()
       throws InterruptedException,
@@ -156,16 +175,13 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends 
AbstractSubscriptionRegres
             });
     thread.start();
 
+    AtomicBoolean isClosed = new AtomicBoolean(false);
     AtomicInteger rowCount = new AtomicInteger(0);
     Thread thread2 =
         new Thread(
             () -> {
-              while (true) {
+              while (!isClosed.get()) {
                 List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(10000));
-                if (messages.isEmpty()) {
-                  break;
-                }
-
                 for (final SubscriptionMessage message : messages) {
                   final short messageType = message.getMessageType();
                   if 
(SubscriptionMessageType.isValidatedMessageType(messageType)) {
@@ -213,12 +229,8 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends 
AbstractSubscriptionRegres
     Thread thread3 =
         new Thread(
             () -> {
-              while (true) {
+              while (!isClosed.get()) {
                 List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(10000));
-                if (messages.isEmpty()) {
-                  break;
-                }
-
                 for (final SubscriptionMessage message : messages) {
                   final short messageType = message.getMessageType();
                   if 
(SubscriptionMessageType.isValidatedMessageType(messageType)) {
@@ -307,6 +319,7 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends 
AbstractSubscriptionRegres
         });
     // close
     consumer.close();
+    isClosed.set(true);
     try {
       consumer.subscribe(topicName, topicName2);
     } catch (Exception e) {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
index a28f617f32d..d1b4d38f36e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+import org.apache.iotdb.subscription.it.Retry;
+import org.apache.iotdb.subscription.it.RetryRule;
 import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
 
 import org.apache.thrift.TException;
@@ -35,14 +37,16 @@ import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /***
  * 1 consumer subscribes to 2 topics: historical data
@@ -50,6 +54,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2SubscriptionRegressionConsumer.class})
 public class IoTDBOneConsumerMultiTopicsTsfileIT extends 
AbstractSubscriptionRegressionIT {
+
+  @Rule public RetryRule retryRule = new RetryRule();
+
   private static final String database = 
"root.test.OneConsumerMultiTopicsTsfile";
   private static final String device = database + ".d_0";
   private static List<MeasurementSchema> schemaList = new ArrayList<>();
@@ -103,6 +110,7 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends 
AbstractSubscriptionReg
     subs.dropTopic(topicName2);
     dropDB(database);
     dropDB(database2);
+    schemaList.clear();
     super.tearDown();
   }
 
@@ -121,6 +129,7 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends 
AbstractSubscriptionReg
     session_src.executeNonQueryStatement("flush;");
   }
 
+  @Retry
   @Test
   public void do_test()
       throws InterruptedException,
@@ -147,23 +156,6 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends 
AbstractSubscriptionReg
     subs.getSubscriptions().forEach((System.out::println));
     assertEquals(subs.getSubscriptions().size(), 2, "subscribe then show 
subscriptions");
 
-    final AtomicInteger rowCount = new AtomicInteger();
-    Thread thread1 =
-        new Thread(
-            () -> {
-              List<String> devices = new ArrayList<>(2);
-              devices.add(device);
-              devices.add(device2);
-              try {
-                List<Integer> results = consume_tsfile(consumer, devices);
-                System.out.println(results);
-                rowCount.addAndGet(results.get(0));
-                rowCount.addAndGet(results.get(1));
-              } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-              }
-            });
-    thread1.start();
     // Subscribe and then write data
     Thread thread =
         new Thread(
@@ -181,7 +173,6 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends 
AbstractSubscriptionReg
             });
     thread.start();
     thread.join();
-    thread1.join();
 
     System.out.println(
         "src insert " + device + " :" + getCount(session_src, "select 
count(s_0) from " + device));
@@ -190,7 +181,11 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends 
AbstractSubscriptionReg
             + device2
             + " :"
             + getCount(session_src, "select count(s_0) from " + device2));
-    assertEquals(rowCount.get(), 200, "After first consumption");
+    // After first consumption
+    List<String> devices = new ArrayList<>(2);
+    devices.add(device);
+    devices.add(device2);
+    consume_tsfile_await(consumer, devices, Arrays.asList(100, 100));
     // Unsubscribe
     consumer.unsubscribe(topicName);
     System.out.println("###### After cancellation query:");
@@ -200,12 +195,12 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends 
AbstractSubscriptionReg
 
     // Unsubscribe and then write data
     insert_data(System.currentTimeMillis(), device2);
-    int result = consume_tsfile(consumer, device2);
     session_src.executeNonQueryStatement(
         "insert into "
             + device
             + "(time,s_0,s_1)values(1703980800000,3.45,'2023-12-31 
08:00:00+08:00');"); // 2023-12-31 08:00:00+08:00
-    assertEquals(result, 5, "After the second consumption");
+    consume_tsfile_await(
+        consumer, Collections.singletonList(device2), 
Collections.singletonList(5));
 
     // close
     consumer.close();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumeTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumeTsfileIT.java
index 2f4fe734185..fbc355d30e4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumeTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumeTsfileIT.java
@@ -40,6 +40,8 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /***
@@ -151,9 +153,8 @@ public class IoTDBDBPatternPullConsumeTsfileIT extends 
AbstractSubscriptionRegre
     //        insert_data(1706659200000L); //2024-01-31 08:00:00+08:00
     insert_data(System.currentTimeMillis());
     // Consumption data
-    List<Integer> results = consume_tsfile_withFileCount(consumer, device);
-    assertEquals(results.get(0), 10);
-    assertEquals(results.get(1), 3, "number of files received");
+    consume_tsfile_with_file_count_await(
+        consumer, Collections.singletonList(device), Arrays.asList(10, 3));
     // Unsubscribe
     consumer.unsubscribe(topicName);
     assertEquals(
@@ -166,11 +167,7 @@ public class IoTDBDBPatternPullConsumeTsfileIT extends 
AbstractSubscriptionRegre
 
     // Consumption data: Progress is not retained after unsubscribing and 
resubscribing. Full
     // synchronization.
-    results = consume_tsfile_withFileCount(consumer, device);
-    assertEquals(
-        results.get(0),
-        15,
-        "Unsubscribe and then resubscribe, progress is not retained. Full 
synchronization.");
-    assertEquals(results.get(1), 4, "Number of files received: resubscribe 
after unsubscribe");
+    consume_tsfile_with_file_count_await(
+        consumer, Collections.singletonList(device), Arrays.asList(15, 4));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumeTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumeTsfileIT.java
index 64507390442..dc19e2201f6 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumeTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumeTsfileIT.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /***
@@ -151,10 +152,7 @@ public class IoTDBDevicePatternPullConsumeTsfileIT extends 
AbstractSubscriptionR
     devices.add(device);
     devices.add(device2);
     devices.add(device3);
-    List<Integer> results = consume_tsfile(consumer, devices);
-    assertEquals(results.get(0), 10);
-    assertEquals(results.get(1), 0);
-    assertEquals(results.get(2), 0);
+    consume_tsfile_await(consumer, devices, Arrays.asList(10, 0, 0));
     // Unsubscribe
     consumer.unsubscribe(topicName);
     assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after 
unsubscribe");
@@ -164,12 +162,6 @@ public class IoTDBDevicePatternPullConsumeTsfileIT extends 
AbstractSubscriptionR
     insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
     // Consumption data: Progress is not retained after unsubscribing and 
resubscribing. Full
     // synchronization.
-    results = consume_tsfile(consumer, devices);
-    assertEquals(
-        results.get(0),
-        15,
-        "After unsubscribing and resubscribing, progress is not retained. Full 
synchronization.");
-    assertEquals(results.get(1), 0, "Cancel subscription and subscribe again," 
+ device2);
-    assertEquals(results.get(2), 0, "Unsubscribe and subscribe again," + 
device3);
+    consume_tsfile_await(consumer, devices, Arrays.asList(15, 0, 0));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBRootPatternPullConsumeTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBRootPatternPullConsumeTsfileIT.java
index 1ff3750d99d..c9409c5518d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBRootPatternPullConsumeTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBRootPatternPullConsumeTsfileIT.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /***
@@ -131,8 +132,8 @@ public class IoTDBRootPatternPullConsumeTsfileIT extends 
AbstractSubscriptionReg
     //        insert_data(1706659200000L); //2024-01-31 08:00:00+08:00
     insert_data(System.currentTimeMillis());
     // Consumption data
-    List<Integer> results = consume_tsfile_withFileCount(consumer, device);
-    assertEquals(results.get(0), 10, "Number of consumption data rows");
+    consume_tsfile_await(
+        consumer, Collections.singletonList(device), 
Collections.singletonList(10));
     // Unsubscribe
     consumer.unsubscribe(topicName);
     assertEquals(subs.getSubscriptions().size(), 0, "show subscriptions after 
unsubscribe");
@@ -142,10 +143,7 @@ public class IoTDBRootPatternPullConsumeTsfileIT extends 
AbstractSubscriptionReg
     insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
     // Consumption data: Progress is not retained after unsubscribing and 
re-subscribing. Full
     // synchronization.
-    results = consume_tsfile_withFileCount(consumer, device);
-    assertEquals(
-        results.get(0),
-        15,
-        "After unsubscribing and resubscribing, progress is not retained. Full 
synchronization.");
+    consume_tsfile_await(
+        consumer, Collections.singletonList(device), 
Collections.singletonList(15));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
index 53729b3d26c..0b60dfd330d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /***
@@ -153,10 +154,7 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends 
AbstractSubscriptionRegre
     devices.add(device);
     devices.add(device2);
     devices.add(database2 + ".d_2");
-    List<Integer> results = consume_tsfile(consumer, devices);
-    assertEquals(results.get(0), 10);
-    assertEquals(results.get(1), 0);
-    assertEquals(results.get(2), 0);
+    consume_tsfile_await(consumer, devices, Arrays.asList(10, 0, 0));
     insert_data(System.currentTimeMillis());
     // Unsubscribe
     consumer.unsubscribe(topicName);
@@ -167,13 +165,6 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends 
AbstractSubscriptionRegre
     insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
     // Consumption data: Progress is not retained after unsubscribing and 
re-subscribing. Full
     // synchronization.
-    results = consume_tsfile(consumer, devices);
-
-    assertEquals(
-        results.get(0),
-        15,
-        "Unsubscribe and resubscribe, progress is not retained. Full 
synchronization.");
-    assertEquals(results.get(1), 0, "Subscribe again after unsubscribe," + 
database + ".d_1");
-    assertEquals(results.get(2), 0, "Unsubscribe and then subscribe again," + 
database2 + ".d_2");
+    consume_tsfile_await(consumer, devices, Arrays.asList(15, 0, 0));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
index 3c229d55ddd..d6637e0c351 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
@@ -28,6 +28,8 @@ import 
org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.Retry;
+import org.apache.iotdb.subscription.it.RetryRule;
 import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
 
 import org.apache.thrift.TException;
@@ -43,6 +45,7 @@ import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -62,6 +65,9 @@ import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2SubscriptionRegressionConsumer.class})
 public class IoTDBOneConsumerMultiTopicsMixIT extends 
AbstractSubscriptionRegressionIT {
+
+  @Rule public RetryRule retryRule = new RetryRule();
+
   private static final String database = "root.test.OneConsumerMultiTopicsMix";
   private static final String database2 = "root.OneConsumerMultiTopicsMix";
   private static final String device = database + ".d_0";
@@ -109,6 +115,7 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends 
AbstractSubscriptionRegres
     subs.dropTopic(topicName2);
     dropDB(database);
     dropDB(database2);
+    schemaList.clear();
     super.tearDown();
   }
 
@@ -127,6 +134,7 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends 
AbstractSubscriptionRegres
     session_src.executeNonQueryStatement("flush");
   }
 
+  @Retry
   @Test
   public void do_test()
       throws InterruptedException,
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
index 0fcb1cd94af..35f1409f9ff 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.subscription.consumer.AckStrategy;
 import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import org.apache.iotdb.subscription.it.Retry;
 import org.apache.iotdb.subscription.it.RetryRule;
 import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
@@ -98,6 +99,15 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends 
AbstractSubscriptionReg
     assertTrue(subs.getTopic(topicName).isPresent(), "create show topics");
   }
 
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
@@ -109,6 +119,7 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends 
AbstractSubscriptionReg
     subs.dropTopic(topicName2);
     dropDB(database);
     dropDB(database2);
+    schemaList.clear();
     super.tearDown();
   }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java
index 0682d0879e6..c4917bf027c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java
@@ -118,7 +118,7 @@ public class IoTDBDataSet1TopicConsumerSpecialIT extends 
AbstractSubscriptionReg
     consumer.subscribe(topicName);
     assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after 
subscription");
     // Consumption data
-    consume_data(consumer);
+    consume_data(consumer, session_dest);
     check_count(4, "select count(`ABH#01`) from " + device, "Consumption 
data:" + pattern);
     // Unsubscribe
     consumer.unsubscribe(topicName);
@@ -133,7 +133,7 @@ public class IoTDBDataSet1TopicConsumerSpecialIT extends 
AbstractSubscriptionReg
     consumer.subscribe(topicName);
     assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after 
re-subscribing");
     // Consumption data
-    consume_data(consumer);
+    consume_data(consumer, session_dest);
     check_count(8, "select count(`ABH#01`) from " + device, "consume data 
again:" + pattern);
     // Unsubscribe
     consumer.unsubscribe(topicName);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 0b434b7b331..cc03f726141 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
 import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
+import org.apache.iotdb.session.subscription.util.PollTimer;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -109,16 +110,30 @@ public class SubscriptionBroker {
     final Map<String, Long> topicNameToIncrements = new HashMap<>();
 
     // Iterate over each sorted topic name and poll the corresponding events
+    int remainingTopicSize = sortedTopicNames.size();
     for (final String topicName : sortedTopicNames) {
       final SubscriptionPrefetchingQueue prefetchingQueue =
           topicNameToPrefetchingQueue.get(topicName);
+      remainingTopicSize -= 1;
+
       // Recheck
       if (Objects.isNull(prefetchingQueue) || prefetchingQueue.isClosed()) {
         continue;
       }
 
       // Poll the event from the prefetching queue
-      final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
+      final SubscriptionEvent event;
+      if (prefetchingQueue instanceof SubscriptionPrefetchingTsFileQueue) {
+        // TODO: current poll timeout is uniform for all candidate topics
+        final PollTimer timer =
+            new PollTimer(
+                System.currentTimeMillis(),
+                SubscriptionAgent.receiver().remainingMs() / Math.max(1, 
remainingTopicSize));
+        event = prefetchingQueue.pollV2(consumerId, timer);
+      } else {
+        // TODO: migrate poll to pollV2
+        event = prefetchingQueue.poll(consumerId);
+      }
       if (Objects.isNull(event)) {
         continue;
       }
@@ -199,10 +214,14 @@ public class SubscriptionBroker {
 
       // Check if the prefetching queue is closed
       if (prefetchingQueue.isClosed()) {
-        LOGGER.warn(
-            "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] is closed",
-            topicName,
-            brokerId);
+        SubscriptionDataNodeResourceManager.log()
+            .schedule(SubscriptionBroker.class, brokerId, topicName)
+            .ifPresent(
+                l ->
+                    l.warn(
+                        "Subscription: prefetching queue bound to topic [{}] 
for consumer group [{}] is closed",
+                        topicName,
+                        brokerId));
         continue;
       }
 
@@ -479,7 +498,11 @@ public class SubscriptionBroker {
                       brokerId));
       return false;
     }
-    return prefetchingQueue.executePrefetch();
+
+    // TODO: migrate executePrefetch to executePrefetchV2
+    return prefetchingQueue instanceof SubscriptionPrefetchingTabletQueue
+        ? prefetchingQueue.executePrefetch()
+        : prefetchingQueue.executePrefetchV2();
   }
 
   public int getPipeEventCount(final String topicName) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index fa443eb50f9..3dd9e0d54fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -19,24 +19,30 @@
 
 package org.apache.iotdb.db.subscription.broker;
 
+import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
 import 
org.apache.iotdb.db.pipe.agent.task.execution.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
+import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch;
 import 
org.apache.iotdb.db.subscription.task.subtask.SubscriptionReceiverSubtask;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import org.apache.iotdb.session.subscription.util.PollTimer;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.tsfile.utils.Pair;
@@ -45,6 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -105,6 +112,12 @@ public abstract class SubscriptionPrefetchingQueue {
   private volatile boolean isCompleted = false;
   private volatile boolean isClosed = false;
 
+  // for prefetch v2
+  // TODO: make it thread-local for higher throughput
+  private volatile TsFileInsertionEvent currentTsFileInsertionEvent;
+  private volatile RetryableEvent<TabletInsertionEvent> 
currentTabletInsertionEvent;
+  private volatile SubscriptionTsFileToTabletIterator currentToTabletIterator;
+
   public SubscriptionPrefetchingQueue(
       final String brokerId,
       final String topicName,
@@ -148,6 +161,20 @@ public abstract class SubscriptionPrefetchingQueue {
 
     // no need to clean up events in inputPendingQueue, see
     // 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.close
+
+    if (Objects.nonNull(currentToTabletIterator)) {
+      currentToTabletIterator.cleanUp();
+      currentToTabletIterator = null;
+    }
+    if (Objects.nonNull(currentTsFileInsertionEvent)) {
+      ((EnrichedEvent) 
currentTsFileInsertionEvent).clearReferenceCount(this.getClass().getName());
+      currentTsFileInsertionEvent = null;
+    }
+    if (Objects.nonNull(currentTabletInsertionEvent)) {
+      ((EnrichedEvent) currentTabletInsertionEvent.innerEvent)
+          .clearReferenceCount(this.getClass().getName());
+      currentTabletInsertionEvent = null;
+    }
   }
 
   /////////////////////////////////  lock  /////////////////////////////////
@@ -188,7 +215,7 @@ public abstract class SubscriptionPrefetchingQueue {
     }
   }
 
-  public SubscriptionEvent pollInternal(final String consumerId) {
+  private SubscriptionEvent pollInternal(final String consumerId) {
     states.markPollRequest();
 
     if (prefetchingQueue.isEmpty()) {
@@ -258,6 +285,77 @@ public abstract class SubscriptionPrefetchingQueue {
     return null;
   }
 
+  public SubscriptionEvent pollV2(final String consumerId, final PollTimer 
timer) {
+    acquireReadLock();
+    try {
+      return isClosed() ? null : pollInternalV2(consumerId, timer);
+    } finally {
+      releaseReadLock();
+    }
+  }
+
+  private SubscriptionEvent pollInternalV2(final String consumerId, final 
PollTimer timer) {
+    states.markPollRequest();
+
+    // do-while ensures at least one poll
+    do {
+      SubscriptionEvent event;
+      try {
+        if (prefetchingQueue.isEmpty()) {
+          // TODO: concurrent polling of multiple prefetching queues
+          Thread.sleep(100);
+          onEvent();
+        }
+
+        final long size = prefetchingQueue.size();
+        long count = 0;
+
+        while (count++ < size // limit control
+            && Objects.nonNull(
+                event =
+                    prefetchingQueue.poll(
+                        
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
+                        TimeUnit.MILLISECONDS))) {
+          if (event.isCommitted()) {
+            LOGGER.warn(
+                "Subscription: SubscriptionPrefetchingQueue {} poll committed 
event {} from prefetching queue (broken invariant), remove it",
+                this,
+                event);
+            // no need to update inFlightEvents
+            continue;
+          }
+
+          if (!event.pollable()) {
+            LOGGER.warn(
+                "Subscription: SubscriptionPrefetchingQueue {} poll 
non-pollable event {} from prefetching queue (broken invariant), nack and 
remove it",
+                this,
+                event);
+            event.nack(); // now pollable
+            // no need to update inFlightEvents and prefetchingQueue
+            continue;
+          }
+
+          // This operation should be performed before updating inFlightEvents 
to prevent multiple
+          // consumers from consuming the same event.
+          event.recordLastPolledTimestamp(); // now non-pollable
+
+          inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()), 
event);
+          event.recordLastPolledConsumerId(consumerId);
+          return event;
+        }
+      } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn(
+            "Subscription: SubscriptionPrefetchingQueue {} interrupted while 
polling events.",
+            this,
+            e);
+      }
+      timer.update();
+    } while (!timer.isExpired());
+
+    return null;
+  }
+
   /////////////////////////////// prefetch ///////////////////////////////
 
   public boolean executePrefetch() {
@@ -283,6 +381,22 @@ public abstract class SubscriptionPrefetchingQueue {
     }
   }
 
+  public boolean executePrefetchV2() {
+    acquireReadLock();
+    try {
+      if (isClosed()) {
+        return false;
+      }
+      reportStateIfNeeded();
+      tryPrefetchV2();
+      remapInFlightEventsSnapshot(committedCleaner, pollableNacker);
+      // always return false
+      return false;
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   private void reportStateIfNeeded() {
     if (System.currentTimeMillis() - lastStateReportTimestamp
         > 
SubscriptionConfig.getInstance().getSubscriptionLogManagerBaseIntervalMs()
@@ -353,7 +467,19 @@ public abstract class SubscriptionPrefetchingQueue {
    * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
    */
   private synchronized void tryPrefetch() {
-    while (!inputPendingQueue.isEmpty()) {
+    while (!inputPendingQueue.isEmpty() || 
Objects.nonNull(currentTabletInsertionEvent)) {
+      if (Objects.nonNull(currentTabletInsertionEvent)) {
+        final RetryableState state = 
onRetryableTabletInsertionEvent(currentTabletInsertionEvent);
+        switch (state) {
+          case PREFETCHED:
+            return;
+          case RETRY:
+            continue;
+          case NO_RETRY:
+            break;
+        }
+      }
+
       final Event event = 
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
       if (Objects.isNull(event)) {
         // The event will be null in two cases:
@@ -389,10 +515,16 @@ public abstract class SubscriptionPrefetchingQueue {
       }
 
       if (event instanceof TabletInsertionEvent) {
-        if (onEvent((TabletInsertionEvent) event)) {
-          return;
+        final RetryableState state =
+            onRetryableTabletInsertionEvent(
+                new RetryableEvent<>((TabletInsertionEvent) event, false, 
false));
+        switch (state) {
+          case PREFETCHED:
+            return;
+          case RETRY:
+          case NO_RETRY:
+            continue;
         }
-        continue;
       }
 
       if (event instanceof TsFileInsertionEvent) {
@@ -420,6 +552,173 @@ public abstract class SubscriptionPrefetchingQueue {
     // At this moment, the inputPendingQueue is empty.
   }
 
+  private synchronized void tryPrefetchV2() {
+    if (!prefetchingQueue.isEmpty()) {
+      return;
+    }
+
+    if (Objects.nonNull(currentTsFileInsertionEvent)) {
+      constructToTabletIterator(currentTsFileInsertionEvent);
+      return;
+    }
+
+    if (Objects.nonNull(currentTabletInsertionEvent)) {
+      final RetryableState state = 
onRetryableTabletInsertionEvent(currentTabletInsertionEvent);
+      switch (state) {
+        case PREFETCHED:
+        case RETRY:
+          return;
+        case NO_RETRY:
+          break;
+      }
+    }
+
+    if (Objects.nonNull(currentToTabletIterator)) {
+      while (currentToTabletIterator.hasNext() || 
Objects.nonNull(currentTabletInsertionEvent)) {
+        if (Objects.nonNull(currentTabletInsertionEvent)) {
+          final RetryableState state = 
onRetryableTabletInsertionEvent(currentTabletInsertionEvent);
+          switch (state) {
+            case PREFETCHED:
+            case RETRY:
+              return;
+            case NO_RETRY:
+              break;
+          }
+        }
+        final RetryableState state =
+            onRetryableTabletInsertionEvent(
+                new RetryableEvent<>(currentToTabletIterator.next(), true, 
true));
+        switch (state) {
+          case PREFETCHED:
+          case RETRY:
+            return;
+          case NO_RETRY:
+            continue;
+        }
+      }
+      currentToTabletIterator.ack();
+      currentToTabletIterator = null;
+    }
+
+    final Event event = 
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
+    if (Objects.isNull(event)) {
+      // The event will be null in two cases:
+      // 1. The inputPendingQueue is empty.
+      // 2. The tsfile event has been deduplicated.
+      return;
+    }
+
+    if (!(event instanceof EnrichedEvent)) {
+      LOGGER.warn(
+          "Subscription: SubscriptionPrefetchingQueue {} only support prefetch 
EnrichedEvent. Ignore {}.",
+          this,
+          event);
+      return;
+    }
+
+    if (event instanceof PipeTerminateEvent) {
+      final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
+      // add mark completed hook
+      terminateEvent.addOnCommittedHook(
+          () -> {
+            markCompleted();
+            return null;
+          });
+      // commit directly
+      ((PipeTerminateEvent) event)
+          
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
+      LOGGER.info(
+          "Subscription: SubscriptionPrefetchingQueue {} commit 
PipeTerminateEvent {}",
+          this,
+          terminateEvent);
+      return;
+    }
+
+    if (event instanceof TabletInsertionEvent) {
+      onRetryableTabletInsertionEvent(
+          new RetryableEvent<>((TabletInsertionEvent) event, false, false));
+      return; // always return here
+    }
+
+    if (event instanceof TsFileInsertionEvent) {
+      if 
(PipeEventCollector.canSkipParsing4TsFileEvent((PipeTsFileInsertionEvent) 
event)) {
+        onEvent((TsFileInsertionEvent) event);
+        return;
+      }
+      if (Objects.nonNull(currentToTabletIterator)) {
+        LOGGER.warn(
+            "Subscription: SubscriptionPrefetchingQueue {} prefetch 
TsFileInsertionEvent when ToTabletIterator is not null (broken invariant). 
Ignore {}.",
+            this,
+            event);
+      } else {
+        constructToTabletIterator((PipeTsFileInsertionEvent) event);
+        return;
+      }
+    }
+
+    // TODO:
+    //  - PipeHeartbeatEvent: ignored? (may affect pipe metrics)
+    //  - UserDefinedEnrichedEvent: ignored?
+    //  - Others: events related to meta sync, safe to ignore
+    LOGGER.info(
+        "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} 
when prefetching.",
+        this,
+        event);
+    ((EnrichedEvent) event)
+        .decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), 
false);
+
+    onEvent();
+  }
+
+  private void constructToTabletIterator(final TsFileInsertionEvent event) {
+    currentTsFileInsertionEvent = null;
+    final Iterator<TabletInsertionEvent> tabletInsertionEventsIterator;
+    try {
+      tabletInsertionEventsIterator = 
event.toTabletInsertionEvents().iterator();
+      currentToTabletIterator =
+          new SubscriptionTsFileToTabletIterator(
+              (PipeTsFileInsertionEvent) event, tabletInsertionEventsIterator);
+    } catch (final PipeException e) {
+      LOGGER.warn("Exception {} occurred when {} construct ToTabletIterator", 
this, e, e);
+      currentTsFileInsertionEvent = event;
+    }
+  }
+
+  private RetryableState onRetryableTabletInsertionEvent(
+      final RetryableEvent<TabletInsertionEvent> retryableEvent) {
+    currentTabletInsertionEvent = null;
+    final EnrichedEvent event = (EnrichedEvent) retryableEvent.innerEvent;
+    if (retryableEvent.shouldIncreaseReferenceCount) {
+      if (!event.increaseReferenceCount(this.getClass().getName())) {
+        LOGGER.warn(
+            "Failed to increase reference count for {} when {} on retryable 
TabletInsertionEvent",
+            event,
+            this);
+        currentTabletInsertionEvent = retryableEvent;
+        return RetryableState.RETRY;
+      }
+      retryableEvent.shouldIncreaseReferenceCount = false;
+    }
+    if (retryableEvent.shouldEnrichWithCommitterKeyAndCommitId) {
+      PipeEventCommitManager.getInstance()
+          .enrichWithCommitterKeyAndCommitId(
+              event,
+              currentToTabletIterator.getCreationTime(),
+              currentToTabletIterator.getRegionId());
+      retryableEvent.shouldEnrichWithCommitterKeyAndCommitId = false;
+    }
+    try {
+      return onEvent(retryableEvent.innerEvent)
+          ? RetryableState.PREFETCHED
+          : RetryableState.NO_RETRY;
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Exception occurred when {} on retryable TabletInsertionEvent {}", 
this, event, e);
+      currentTabletInsertionEvent = retryableEvent;
+      return RetryableState.RETRY;
+    }
+  }
+
   /**
    * @return {@code true} if there are subscription events prefetched.
    */
@@ -427,8 +726,9 @@ public abstract class SubscriptionPrefetchingQueue {
 
   /**
    * @return {@code true} if there are subscription events prefetched.
+   * @throws Exception only occur when constructing {@link 
SubscriptionPipeTsFileEventBatch}.
    */
-  protected boolean onEvent(final TabletInsertionEvent event) {
+  protected boolean onEvent(final TabletInsertionEvent event) throws Exception 
{
     return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
   }
 
@@ -761,4 +1061,79 @@ public abstract class SubscriptionPrefetchingQueue {
         }
         return ev;
       };
+
+  /////////////////////////////// tsfile to tablet iteration 
///////////////////////////////
+
+  private static class SubscriptionTsFileToTabletIterator
+      implements Iterator<TabletInsertionEvent> {
+
+    private final PipeTsFileInsertionEvent tsFileInsertionEvent;
+    private final Iterator<TabletInsertionEvent> tabletInsertionEventsIterator;
+
+    private SubscriptionTsFileToTabletIterator(
+        final PipeTsFileInsertionEvent tsFileInsertionEvent,
+        final Iterator<TabletInsertionEvent> tabletInsertionEventsIterator) {
+      this.tsFileInsertionEvent = tsFileInsertionEvent;
+      this.tabletInsertionEventsIterator = tabletInsertionEventsIterator;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return tabletInsertionEventsIterator.hasNext();
+    }
+
+    @Override
+    public TabletInsertionEvent next() {
+      return tabletInsertionEventsIterator.next();
+    }
+
+    public void ack() {
+      try {
+        tsFileInsertionEvent.close();
+      } catch (final Exception ignored) {
+      }
+      // should not report here
+      tsFileInsertionEvent.decreaseReferenceCount(this.getClass().getName(), 
false);
+    }
+
+    public void cleanUp() {
+      try {
+        tsFileInsertionEvent.close();
+      } catch (final Exception ignored) {
+      }
+      tsFileInsertionEvent.clearReferenceCount(this.getClass().getName());
+    }
+
+    public long getCreationTime() {
+      return tsFileInsertionEvent.getCreationTime();
+    }
+
+    public int getRegionId() {
+      return tsFileInsertionEvent.getRegionId();
+    }
+  }
+
+  /////////////////////////////// retryable Event 
///////////////////////////////
+
+  private static class RetryableEvent<E extends Event> {
+
+    private final E innerEvent;
+    private volatile boolean shouldIncreaseReferenceCount;
+    private volatile boolean shouldEnrichWithCommitterKeyAndCommitId;
+
+    private RetryableEvent(
+        final E innerEvent,
+        final boolean shouldIncreaseReferenceCount,
+        final boolean shouldEnrichWithCommitterKeyAndCommitId) {
+      this.innerEvent = innerEvent;
+      this.shouldIncreaseReferenceCount = shouldIncreaseReferenceCount;
+      this.shouldEnrichWithCommitterKeyAndCommitId = 
shouldEnrichWithCommitterKeyAndCommitId;
+    }
+  }
+
+  private enum RetryableState {
+    RETRY,
+    NO_RETRY,
+    PREFETCHED,
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 6596b26e7d8..755826f4ced 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -193,7 +193,12 @@ public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQ
 
   @Override
   protected boolean onEvent(final TsFileInsertionEvent event) {
-    return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
+    try {
+      return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
+    } catch (final Exception e) {
+      LOGGER.error("Subscription: unexpected exception (broken invariant) {}", 
e, e);
+    }
+    return false;
   }
 
   /////////////////////////////// stringify ///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 6217af0123b..1b29d1d97c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -19,9 +19,7 @@
 
 package org.apache.iotdb.db.subscription.broker;
 
-import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
-import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -247,10 +245,6 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
 
   @Override
   protected boolean onEvent(final TsFileInsertionEvent event) {
-    if 
(!PipeEventCollector.canSkipParsing4TsFileEvent((PipeTsFileInsertionEvent) 
event)) {
-      return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
-    }
-
     final SubscriptionCommitContext commitContext = 
generateSubscriptionCommitContext();
     final SubscriptionEvent ev =
         new SubscriptionEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
new file mode 100644
index 00000000000..194eb5a8fa5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.batch;
+
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** refer to {@link 
org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceSegmentLock} */
+public class SubscriptionPipeEventBatchSegmentLock {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SubscriptionPipeEventBatchSegmentLock.class);
+
+  private static final int SEGMENT_LOCK_MIN_SIZE = 32;
+  private static final int SEGMENT_LOCK_MAX_SIZE = 128;
+
+  private volatile ReentrantLock[] locks;
+
+  private void initIfNecessary() {
+    if (locks == null) {
+      synchronized (this) {
+        if (locks == null) {
+          int lockSegmentSize = SEGMENT_LOCK_MIN_SIZE;
+          try {
+            lockSegmentSize = 
StorageEngine.getInstance().getAllDataRegionIds().size();
+          } catch (final Exception e) {
+            LOGGER.warn(
+                "Cannot get data region ids, use default lock segment size: 
{}", lockSegmentSize);
+          }
+          lockSegmentSize = Math.min(SEGMENT_LOCK_MAX_SIZE, lockSegmentSize);
+          lockSegmentSize = Math.max(SEGMENT_LOCK_MIN_SIZE, lockSegmentSize);
+
+          final ReentrantLock[] tmpLocks = new ReentrantLock[lockSegmentSize];
+          for (int i = 0; i < tmpLocks.length; i++) {
+            tmpLocks[i] = new ReentrantLock();
+          }
+
+          // publish this variable
+          locks = tmpLocks;
+        }
+      }
+    }
+  }
+
+  public void lock(final int regionId) {
+    initIfNecessary();
+    locks[regionId % locks.length].lock();
+  }
+
+  public void unlock(final int regionId) {
+    initIfNecessary();
+    locks[regionId % locks.length].unlock();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
index c20dffcd5e9..17debfc4b48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
@@ -30,9 +30,9 @@ import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
@@ -45,6 +45,7 @@ public class SubscriptionPipeEventBatches {
   protected final long maxBatchSizeInBytes;
 
   private final Map<Integer, SubscriptionPipeEventBatch> regionIdToBatch;
+  private final SubscriptionPipeEventBatchSegmentLock segmentLock;
 
   public SubscriptionPipeEventBatches(
       final SubscriptionPrefetchingQueue prefetchingQueue,
@@ -54,7 +55,8 @@ public class SubscriptionPipeEventBatches {
     this.maxDelayInMs = maxDelayInMs;
     this.maxBatchSizeInBytes = maxBatchSizeInBytes;
 
-    this.regionIdToBatch = new ConcurrentHashMap<>();
+    this.regionIdToBatch = new HashMap<>();
+    this.segmentLock = new SubscriptionPipeEventBatchSegmentLock();
   }
 
   /**
@@ -63,29 +65,25 @@ public class SubscriptionPipeEventBatches {
   public boolean onEvent(final Consumer<SubscriptionEvent> consumer) {
     final AtomicBoolean hasNew = new AtomicBoolean(false);
     for (final int regionId : ImmutableList.copyOf(regionIdToBatch.keySet())) {
-      regionIdToBatch.compute(
-          regionId,
-          (key, batch) -> {
-            if (Objects.isNull(batch)) {
-              return null;
-            }
-
-            try {
-              if (batch.onEvent(consumer)) {
-                hasNew.set(true);
-                return null; // remove this entry
-              }
-              // Seal this batch next time.
-            } catch (final Exception e) {
-              LOGGER.warn("Exception occurred when sealing events from batch 
{}", batch, e);
-              // Seal this batch next time.
-            }
-
-            return batch;
-          });
-
-      if (hasNew.get()) {
-        break;
+      try {
+        segmentLock.lock(regionId);
+        final SubscriptionPipeEventBatch batch = regionIdToBatch.get(regionId);
+        if (Objects.isNull(batch)) {
+          continue;
+        }
+        try {
+          if (batch.onEvent(consumer)) {
+            hasNew.set(true);
+          }
+        } catch (final Exception e) {
+          LOGGER.warn("Exception occurred when sealing events from batch {}", 
batch, e);
+        }
+        if (hasNew.get()) {
+          regionIdToBatch.remove(regionId);
+          break;
+        }
+      } finally {
+        segmentLock.unlock(regionId);
       }
     }
 
@@ -96,41 +94,51 @@ public class SubscriptionPipeEventBatches {
    * @return {@code true} if there are subscription events consumed.
    */
   public boolean onEvent(
-      final @NonNull EnrichedEvent event, final Consumer<SubscriptionEvent> 
consumer) {
+      final @NonNull EnrichedEvent event, final Consumer<SubscriptionEvent> 
consumer)
+      throws Exception {
     final int regionId = event.getCommitterKey().getRegionId();
 
     final AtomicBoolean hasNew = new AtomicBoolean(false);
-    regionIdToBatch.compute(
-        regionId,
-        (key, batch) -> {
-          if (Objects.isNull(batch)) {
-            batch =
-                prefetchingQueue instanceof SubscriptionPrefetchingTabletQueue
-                    ? new SubscriptionPipeTabletEventBatch(
-                        key,
-                        (SubscriptionPrefetchingTabletQueue) prefetchingQueue,
-                        maxDelayInMs,
-                        maxBatchSizeInBytes)
-                    : new SubscriptionPipeTsFileEventBatch(
-                        key,
-                        (SubscriptionPrefetchingTsFileQueue) prefetchingQueue,
-                        maxDelayInMs,
-                        maxBatchSizeInBytes);
-          }
+    try {
+      segmentLock.lock(regionId);
+      SubscriptionPipeEventBatch batch = regionIdToBatch.get(regionId);
+      if (Objects.isNull(batch)) {
+        try {
+          batch =
+              prefetchingQueue instanceof SubscriptionPrefetchingTabletQueue
+                  ? new SubscriptionPipeTabletEventBatch(
+                      regionId,
+                      (SubscriptionPrefetchingTabletQueue) prefetchingQueue,
+                      maxDelayInMs,
+                      maxBatchSizeInBytes)
+                  : new SubscriptionPipeTsFileEventBatch(
+                      regionId,
+                      (SubscriptionPrefetchingTsFileQueue) prefetchingQueue,
+                      maxDelayInMs,
+                      maxBatchSizeInBytes);
+        } catch (final Exception e) {
+          LOGGER.warn("Exception occurred when construct new batch", e);
+          throw e; // rethrow exception for retry
+        }
+      }
 
-          try {
-            if (batch.onEvent(event, consumer)) {
-              hasNew.set(true);
-              return null; // remove this entry
-            }
-            // Seal this batch next time.
-          } catch (final Exception e) {
-            LOGGER.warn("Exception occurred when sealing events from batch 
{}", batch, e);
-            // Seal this batch next time.
-          }
+      try {
+        if (batch.onEvent(event, consumer)) {
+          hasNew.set(true);
+        }
+      } catch (final Exception e) {
+        LOGGER.warn("Exception occurred when sealing events from batch {}", 
batch, e);
+      }
 
-          return batch;
-        });
+      if (hasNew.get()) {
+        regionIdToBatch.remove(regionId);
+      } else {
+        regionIdToBatch.put(regionId, batch);
+      }
+
+    } finally {
+      segmentLock.unlock(regionId);
+    }
 
     return hasNew.get();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 15c93d7a5e6..47be439912b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.subscription.event.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
-import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
@@ -56,22 +55,12 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
   @Override
   public synchronized void ack() {
     batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
-    enrichedEvents.stream()
-        // only decrease reference count for tsfile event, since we already 
decrease reference count
-        // for tablet event in batch
-        .filter(event -> event instanceof PipeTsFileInsertionEvent)
-        .forEach(event -> 
event.decreaseReferenceCount(this.getClass().getName(), true));
   }
 
   @Override
   public synchronized void cleanUp(final boolean force) {
     // close batch, it includes clearing the reference count of events
     batch.close();
-
-    // clear the reference count of events
-    for (final EnrichedEvent enrichedEvent : enrichedEvents) {
-      enrichedEvent.clearReferenceCount(this.getClass().getName());
-    }
     enrichedEvents.clear();
   }
 
@@ -92,31 +81,10 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
 
   @Override
   protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
-    // TODO: parse tsfile event on the fly like 
SubscriptionPipeTabletEventBatch
-    try {
-      ((PipeTsFileInsertionEvent) event)
-          .consumeTabletInsertionEventsWithRetry(
-              event1 -> {
-                if (!event1.increaseReferenceCount(this.getClass().getName())) 
{
-                  LOGGER.warn(
-                      "SubscriptionPipeTsFileEventBatch: Failed to increase 
the reference count of event {}, skipping it.",
-                      event1.coreReportMessage());
-                } else {
-                  try {
-                    batch.onEvent(event1);
-                  } catch (final Exception ignored) {
-                    // no exceptions will be thrown
-                  }
-                }
-              },
-              "SubscriptionPipeTsFileEventBatch::onTsFileInsertionEvent");
-    } finally {
-      try {
-        event.close();
-      } catch (final Exception ignored) {
-        // no exceptions will be thrown
-      }
-    }
+    LOGGER.warn(
+        "SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {} 
when batching.",
+        this,
+        event);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index c1e52165b8d..4c8033f1612 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -177,6 +177,8 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     if (Objects.isNull(pollTimer)) {
       return 
SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs();
     }
+    // update timer before fetch remaining ms
+    pollTimer.update();
     return pollTimer.remainingMs();
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 04d2263ca31..556cc581c7e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -333,9 +333,9 @@ public class CommonConfig {
   private float subscriptionCacheMemoryUsagePercentage = 0.2F;
   private int subscriptionSubtaskExecutorMaxThreadNum = 2;
 
-  private int subscriptionPrefetchTabletBatchMaxDelayInMs = 20; // 1s
+  private int subscriptionPrefetchTabletBatchMaxDelayInMs = 20;
   private long subscriptionPrefetchTabletBatchMaxSizeInBytes = MB;
-  private int subscriptionPrefetchTsFileBatchMaxDelayInMs = 1000; // 5s
+  private int subscriptionPrefetchTsFileBatchMaxDelayInMs = 1000;
   private long subscriptionPrefetchTsFileBatchMaxSizeInBytes = 2 * MB;
   private int subscriptionPollMaxBlockingTimeMs = 500;
   private int subscriptionDefaultTimeoutInMs = 10_000; // 10s


Reply via email to