This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 444f57b5ac3 Subscription: apply IoTConsensusV2 as cluster mode for 
integration test (#15546)
444f57b5ac3 is described below

commit 444f57b5ac3a395279c565f5e93ca4cf5856eade
Author: VGalaxies <[email protected]>
AuthorDate: Thu May 29 17:27:53 2025 +0800

    Subscription: apply IoTConsensusV2 as cluster mode for integration test 
(#15546)
---
 .github/workflows/pipe-it.yml                      |  6 +-
 .../it/IoTDBSubscriptionITConstant.java            | 28 ++++++++
 .../AbstractSubscriptionTreeRegressionIT.java      | 82 ++++++++++++++++------
 .../IoTDBDefaultTsfilePushConsumerIT.java          | 11 +++
 .../IoTDBRootPullConsumeTsfileIT.java              | 10 +++
 .../multi/IoTDBOneConsumerMultiTopicsTsfileIT.java | 10 +++
 .../IoTDBDevicePatternPullConsumerDataSetIT.java   | 31 +++++---
 ...IoTDBMiddleMatchPatternPullConsumeTsfileIT.java | 28 ++++----
 ...oTDBSnapshotTSPatternDatasetPushConsumerIT.java | 14 +++-
 .../multi/IoTDBMultiGroupVsMultiConsumerIT.java    | 10 +++
 10 files changed, 183 insertions(+), 47 deletions(-)

diff --git a/.github/workflows/pipe-it.yml b/.github/workflows/pipe-it.yml
index ba09fb4e27e..2c1b03623d1 100644
--- a/.github/workflows/pipe-it.yml
+++ b/.github/workflows/pipe-it.yml
@@ -438,7 +438,7 @@ jobs:
       matrix:
         java: [ 17 ]
         # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster1: [ ScalableSingleNodeMode ]
+        cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, 
PipeConsensusStreamMode ]
         cluster2: [ ScalableSingleNodeMode ]
         os: [ ubuntu-latest ]
     runs-on: ${{ matrix.os }}
@@ -606,7 +606,7 @@ jobs:
       matrix:
         java: [ 17 ]
         # do not use HighPerformanceMode here, otherwise some tests will cause 
the GH runner to receive a shutdown signal
-        cluster1: [ ScalableSingleNodeMode ]
+        cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, 
PipeConsensusStreamMode ]
         cluster2: [ ScalableSingleNodeMode ]
         os: [ ubuntu-latest ]
     runs-on: ${{ matrix.os }}
@@ -690,7 +690,7 @@ jobs:
       matrix:
         java: [ 17 ]
         # do not use HighPerformanceMode here, otherwise some tests will cause 
the GH runner to receive a shutdown signal
-        cluster1: [ ScalableSingleNodeMode ]
+        cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, 
PipeConsensusStreamMode ]
         cluster2: [ ScalableSingleNodeMode ]
         os: [ ubuntu-latest ]
     runs-on: ${{ matrix.os }}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
index 5b8ec393274..3162139fb66 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
@@ -19,10 +19,15 @@
 
 package org.apache.iotdb.subscription.it;
 
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.session.Session;
+
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionFactory;
 
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 public class IoTDBSubscriptionITConstant {
 
@@ -40,4 +45,27 @@ public class IoTDBSubscriptionITConstant {
 
   public static final long SLEEP_NS = 1_000_000_000L;
   public static final long POLL_TIMEOUT_MS = 10_000L;
+
+  @FunctionalInterface
+  public interface WrappedVoidSupplier {
+    void get() throws Throwable;
+  }
+
+  public static void AWAIT_WITH_FLUSH(final Session session, final 
WrappedVoidSupplier assertions) {
+    AWAIT.untilAsserted(
+        () -> {
+          session.executeNonQueryStatement("flush");
+          assertions.get();
+        });
+  }
+
+  public static Consumer<BaseEnv> FORCE_SCALABLE_SINGLE_NODE_MODE =
+      env ->
+          env.getConfig()
+              .getCommonConfig()
+              
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+              
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+              
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+              .setSchemaReplicationFactor(1)
+              .setDataReplicationFactor(1);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java
index 4b0164801cf..288f202ee96 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.session.subscription.SubscriptionTreeSession;
 import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
+import 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.WrappedVoidSupplier;
 import org.apache.iotdb.subscription.it.triple.AbstractSubscriptionTripleIT;
 
 import org.apache.thrift.TException;
@@ -57,6 +58,7 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
 import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS;
 
 public abstract class AbstractSubscriptionTreeRegressionIT extends 
AbstractSubscriptionTripleIT {
@@ -359,26 +361,6 @@ public abstract class AbstractSubscriptionTreeRegressionIT 
extends AbstractSubsc
     return results;
   }
 
-  public static void consume_data_long(
-      SubscriptionTreePullConsumer consumer, Session session, Long timeout)
-      throws StatementExecutionException, InterruptedException, 
IoTDBConnectionException {
-    timeout = System.currentTimeMillis() + timeout;
-    while (System.currentTimeMillis() < timeout) {
-      List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
-      if (messages.isEmpty()) {
-        Thread.sleep(1000);
-      }
-      for (final SubscriptionMessage message : messages) {
-        for (final Iterator<Tablet> it = 
message.getSessionDataSetsHandler().tabletIterator();
-            it.hasNext(); ) {
-          final Tablet tablet = it.next();
-          session.insertTablet(tablet);
-        }
-      }
-      consumer.commitSync(messages);
-    }
-  }
-
   public void consume_data(SubscriptionTreePullConsumer consumer)
       throws TException,
           IOException,
@@ -388,6 +370,66 @@ public abstract class AbstractSubscriptionTreeRegressionIT 
extends AbstractSubsc
     consume_data(consumer, session_dest);
   }
 
+  public void consume_data_await(
+      SubscriptionTreePullConsumer consumer,
+      Session session,
+      List<WrappedVoidSupplier> assertions) {
+    AWAIT.untilAsserted(
+        () -> {
+          List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+          if (messages.isEmpty()) {
+            session_src.executeNonQueryStatement("flush");
+          }
+          for (final SubscriptionMessage message : messages) {
+            for (final Iterator<Tablet> it = 
message.getSessionDataSetsHandler().tabletIterator();
+                it.hasNext(); ) {
+              final Tablet tablet = it.next();
+              session.insertTablet(tablet);
+            }
+          }
+          consumer.commitSync(messages);
+          for (final WrappedVoidSupplier assertion : assertions) {
+            assertion.get();
+          }
+        });
+  }
+
+  public void consume_tsfile_await(
+      SubscriptionTreePullConsumer consumer, List<String> devices, 
List<Integer> expected) {
+    final List<AtomicInteger> counters = new ArrayList<>(devices.size());
+    for (int i = 0; i < devices.size(); i++) {
+      counters.add(new AtomicInteger(0));
+    }
+    AWAIT.untilAsserted(
+        () -> {
+          List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+          if (messages.isEmpty()) {
+            session_src.executeNonQueryStatement("flush");
+          }
+          for (final SubscriptionMessage message : messages) {
+            final SubscriptionTsFileHandler tsFileHandler = 
message.getTsFileHandler();
+            try (final TsFileReader tsFileReader = tsFileHandler.openReader()) 
{
+              for (int i = 0; i < devices.size(); i++) {
+                final Path path = new Path(devices.get(i), "s_0", true);
+                final QueryDataSet dataSet =
+                    tsFileReader.query(
+                        
QueryExpression.create(Collections.singletonList(path), null));
+                while (dataSet.hasNext()) {
+                  dataSet.next();
+                  counters.get(i).addAndGet(1);
+                }
+              }
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+          consumer.commitSync(messages);
+          for (int i = 0; i < devices.size(); i++) {
+            assertEquals(counters.get(i).get(), expected.get(i));
+          }
+        });
+  }
+
   //////////////////////////// strict assertions ////////////////////////////
 
   public static void assertEquals(int actual, int expected) {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
index f99d88b8a29..3468a6e93ca 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.subscription.consumer.AckStrategy;
 import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import 
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
 
 import org.apache.thrift.TException;
@@ -81,6 +82,15 @@ public class IoTDBDefaultTsfilePushConsumerIT extends 
AbstractSubscriptionTreeRe
     }
   }
 
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
@@ -107,6 +117,7 @@ public class IoTDBDefaultTsfilePushConsumerIT extends 
AbstractSubscriptionTreeRe
       timestamp += 2000;
     }
     session_src.insertTablet(tablet);
+    session_src.executeNonQueryStatement("flush");
   }
 
   @Test
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
index d340363a41b..9ec58ef95f4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTreeRegressio
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import 
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
 
 import org.apache.thrift.TException;
@@ -69,6 +70,15 @@ public class IoTDBRootPullConsumeTsfileIT extends 
AbstractSubscriptionTreeRegres
     session_src.executeNonQueryStatement("create database 
root.RootPullConsumeTsfile");
   }
 
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
index a9fecef2dc9..1ae3269f239 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTreeRegressio
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import 
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
 
 import org.apache.thrift.TException;
@@ -83,6 +84,15 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends 
AbstractSubscriptionTre
     assertTrue(subs.getTopic(topicName2).isPresent(), "Create show topics 2");
   }
 
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
index 2cad94fc4ac..c24326fed95 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 @RunWith(IoTDBTestRunner.class)
@@ -115,6 +116,7 @@ public class IoTDBDevicePatternPullConsumerDataSetIT 
extends AbstractSubscriptio
       timestamp += row * 2000;
     }
     session_src.insertTablet(tablet);
+    session_src.executeNonQueryStatement("flush");
   }
 
   @Test
@@ -132,13 +134,18 @@ public class IoTDBDevicePatternPullConsumerDataSetIT 
extends AbstractSubscriptio
     assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after 
subscription");
     insert_data(System.currentTimeMillis() - 30000L);
     // Consumption data
-    consume_data(consumer, session_dest);
     String sql = "select count(s_0) from " + device;
-    System.out.println("src: " + getCount(session_src, sql));
-    check_count(8, sql, "Consumption data:" + pattern);
-    check_count(8, "select count(s_1) from " + device, "Consumption data: 
s_1");
-    check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption 
data:d_1");
-    check_count(0, "select count(s_0) from " + device2, "Consumption 
data:d_2");
+    consume_data_await(
+        consumer,
+        session_dest,
+        Collections.singletonList(
+            () -> {
+              System.out.println("src: " + getCount(session_src, sql));
+              check_count(8, sql, "Consumption data:" + pattern);
+              check_count(8, "select count(s_1) from " + device, "Consumption 
data: s_1");
+              check_count(0, "select count(s_0) from " + database + ".d_1", 
"Consumption data:d_1");
+              check_count(0, "select count(s_0) from " + device2, "Consumption 
data:d_2");
+            }));
     insert_data(System.currentTimeMillis());
     // Unsubscribe
     consumer.unsubscribe(topicName);
@@ -149,8 +156,14 @@ public class IoTDBDevicePatternPullConsumerDataSetIT 
extends AbstractSubscriptio
     System.out.println("src: " + getCount(session_src, sql));
     // Consumption data: Progress is not retained after unsubscribing and then 
re-subscribing. Full
     // synchronization.
-    consume_data(consumer, session_dest);
-    check_count(12, "select count(s_0) from " + device, "consume data 
again:s_0");
-    check_count(12, "select count(s_1) from " + device, "Consumption data: 
s_1");
+    consume_data_await(
+        consumer,
+        session_dest,
+        Collections.singletonList(
+            () -> {
+              System.out.println("src: " + getCount(session_src, sql));
+              check_count(12, "select count(s_0) from " + device, "consume 
data again:s_0");
+              check_count(12, "select count(s_1) from " + device, "Consumption 
data: s_1");
+            }));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
index 2572c41de93..49562e5314c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTreeRegressio
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import 
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
 
 import org.apache.thrift.TException;
@@ -41,6 +42,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /***
@@ -94,6 +96,16 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT
     assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics");
   }
 
+  // TODO: remove it later
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
@@ -151,11 +163,7 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT
     devices.add(device);
     devices.add(device2);
     devices.add(database2 + ".d_2");
-
-    List<Integer> rowCounts = consume_tsfile(consumer, devices);
-    assertEquals(rowCounts.get(0), 10);
-    assertEquals(rowCounts.get(1), 1);
-    assertEquals(rowCounts.get(2), 1);
+    consume_tsfile_await(consumer, devices, Arrays.asList(10, 1, 1));
     // Unsubscribe
     consumer.unsubscribe(topicName);
     assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after 
cancellation");
@@ -165,14 +173,6 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT
     insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
     // Consumption data: Progress is not retained after canceling and 
re-subscribing. Full
     // synchronization.
-    rowCounts = consume_tsfile(consumer, devices);
-
-    assertEquals(
-        rowCounts.get(0),
-        15,
-        "Unsubscribe and resubscribe, progress is not retained. Full 
synchronization.");
-    assertEquals(
-        rowCounts.get(1), 1, "Cancel subscription and subscribe again," + 
database + ".d_1");
-    assertEquals(rowCounts.get(2), 1, "Unsubscribe and resubscribe," + 
database2 + ".d_2");
+    consume_tsfile_await(consumer, devices, Arrays.asList(15, 1, 1));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
index d003cbceaed..b90b46a3e2d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.session.subscription.consumer.AckStrategy;
 import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import 
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
 
 import org.apache.thrift.TException;
@@ -100,6 +101,16 @@ public class IoTDBSnapshotTSPatternDatasetPushConsumerIT
     assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics");
   }
 
+  // TODO: remove it later
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
@@ -188,7 +199,8 @@ public class IoTDBSnapshotTSPatternDatasetPushConsumerIT
 
     // Consumption data: Progress is not retained when re-subscribing after 
cancellation. Full
     // synchronization.
-    AWAIT.untilAsserted(
+    IoTDBSubscriptionITConstant.AWAIT_WITH_FLUSH(
+        session_src,
         () -> {
           check_count(12, "select count(s_0) from " + device, "consume data 
again:s_0 " + device);
           check_count(0, "select count(s_1) from " + device, "Consumption 
data: s_1 " + device);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
index 17fa08cd9e2..ecd515964d0 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import 
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
 
 import org.apache.thrift.TException;
@@ -122,6 +123,15 @@ public class IoTDBMultiGroupVsMultiConsumerIT extends 
AbstractSubscriptionTreeRe
     subs.getTopics().forEach(System.out::println);
   }
 
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {

Reply via email to