This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 444f57b5ac3 Subscription: apply IoTConsensusV2 as cluster mode for
integration test (#15546)
444f57b5ac3 is described below
commit 444f57b5ac3a395279c565f5e93ca4cf5856eade
Author: VGalaxies <[email protected]>
AuthorDate: Thu May 29 17:27:53 2025 +0800
Subscription: apply IoTConsensusV2 as cluster mode for integration test
(#15546)
---
.github/workflows/pipe-it.yml | 6 +-
.../it/IoTDBSubscriptionITConstant.java | 28 ++++++++
.../AbstractSubscriptionTreeRegressionIT.java | 82 ++++++++++++++++------
.../IoTDBDefaultTsfilePushConsumerIT.java | 11 +++
.../IoTDBRootPullConsumeTsfileIT.java | 10 +++
.../multi/IoTDBOneConsumerMultiTopicsTsfileIT.java | 10 +++
.../IoTDBDevicePatternPullConsumerDataSetIT.java | 31 +++++---
...IoTDBMiddleMatchPatternPullConsumeTsfileIT.java | 28 ++++----
...oTDBSnapshotTSPatternDatasetPushConsumerIT.java | 14 +++-
.../multi/IoTDBMultiGroupVsMultiConsumerIT.java | 10 +++
10 files changed, 183 insertions(+), 47 deletions(-)
diff --git a/.github/workflows/pipe-it.yml b/.github/workflows/pipe-it.yml
index ba09fb4e27e..2c1b03623d1 100644
--- a/.github/workflows/pipe-it.yml
+++ b/.github/workflows/pipe-it.yml
@@ -438,7 +438,7 @@ jobs:
matrix:
java: [ 17 ]
# StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster1: [ ScalableSingleNodeMode ]
+ cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode,
PipeConsensusStreamMode ]
cluster2: [ ScalableSingleNodeMode ]
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}
@@ -606,7 +606,7 @@ jobs:
matrix:
java: [ 17 ]
# do not use HighPerformanceMode here, otherwise some tests will cause
the GH runner to receive a shutdown signal
- cluster1: [ ScalableSingleNodeMode ]
+ cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode,
PipeConsensusStreamMode ]
cluster2: [ ScalableSingleNodeMode ]
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}
@@ -690,7 +690,7 @@ jobs:
matrix:
java: [ 17 ]
# do not use HighPerformanceMode here, otherwise some tests will cause
the GH runner to receive a shutdown signal
- cluster1: [ ScalableSingleNodeMode ]
+ cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode,
PipeConsensusStreamMode ]
cluster2: [ ScalableSingleNodeMode ]
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
index 5b8ec393274..3162139fb66 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
@@ -19,10 +19,15 @@
package org.apache.iotdb.subscription.it;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.session.Session;
+
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
public class IoTDBSubscriptionITConstant {
@@ -40,4 +45,27 @@ public class IoTDBSubscriptionITConstant {
public static final long SLEEP_NS = 1_000_000_000L;
public static final long POLL_TIMEOUT_MS = 10_000L;
+
+ @FunctionalInterface
+ public interface WrappedVoidSupplier {
+ void get() throws Throwable;
+ }
+
+ public static void AWAIT_WITH_FLUSH(final Session session, final
WrappedVoidSupplier assertions) {
+ AWAIT.untilAsserted(
+ () -> {
+ session.executeNonQueryStatement("flush");
+ assertions.get();
+ });
+ }
+
+ public static Consumer<BaseEnv> FORCE_SCALABLE_SINGLE_NODE_MODE =
+ env ->
+ env.getConfig()
+ .getCommonConfig()
+
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setSchemaReplicationFactor(1)
+ .setDataReplicationFactor(1);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java
index 4b0164801cf..288f202ee96 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.session.subscription.SubscriptionTreeSession;
import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
+import
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.WrappedVoidSupplier;
import org.apache.iotdb.subscription.it.triple.AbstractSubscriptionTripleIT;
import org.apache.thrift.TException;
@@ -57,6 +58,7 @@ import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS;
public abstract class AbstractSubscriptionTreeRegressionIT extends
AbstractSubscriptionTripleIT {
@@ -359,26 +361,6 @@ public abstract class AbstractSubscriptionTreeRegressionIT
extends AbstractSubsc
return results;
}
- public static void consume_data_long(
- SubscriptionTreePullConsumer consumer, Session session, Long timeout)
- throws StatementExecutionException, InterruptedException,
IoTDBConnectionException {
- timeout = System.currentTimeMillis() + timeout;
- while (System.currentTimeMillis() < timeout) {
- List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
- if (messages.isEmpty()) {
- Thread.sleep(1000);
- }
- for (final SubscriptionMessage message : messages) {
- for (final Iterator<Tablet> it =
message.getSessionDataSetsHandler().tabletIterator();
- it.hasNext(); ) {
- final Tablet tablet = it.next();
- session.insertTablet(tablet);
- }
- }
- consumer.commitSync(messages);
- }
- }
-
public void consume_data(SubscriptionTreePullConsumer consumer)
throws TException,
IOException,
@@ -388,6 +370,66 @@ public abstract class AbstractSubscriptionTreeRegressionIT
extends AbstractSubsc
consume_data(consumer, session_dest);
}
+ public void consume_data_await(
+ SubscriptionTreePullConsumer consumer,
+ Session session,
+ List<WrappedVoidSupplier> assertions) {
+ AWAIT.untilAsserted(
+ () -> {
+ List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+ if (messages.isEmpty()) {
+ session_src.executeNonQueryStatement("flush");
+ }
+ for (final SubscriptionMessage message : messages) {
+ for (final Iterator<Tablet> it =
message.getSessionDataSetsHandler().tabletIterator();
+ it.hasNext(); ) {
+ final Tablet tablet = it.next();
+ session.insertTablet(tablet);
+ }
+ }
+ consumer.commitSync(messages);
+ for (final WrappedVoidSupplier assertion : assertions) {
+ assertion.get();
+ }
+ });
+ }
+
+ public void consume_tsfile_await(
+ SubscriptionTreePullConsumer consumer, List<String> devices,
List<Integer> expected) {
+ final List<AtomicInteger> counters = new ArrayList<>(devices.size());
+ for (int i = 0; i < devices.size(); i++) {
+ counters.add(new AtomicInteger(0));
+ }
+ AWAIT.untilAsserted(
+ () -> {
+ List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+ if (messages.isEmpty()) {
+ session_src.executeNonQueryStatement("flush");
+ }
+ for (final SubscriptionMessage message : messages) {
+ final SubscriptionTsFileHandler tsFileHandler =
message.getTsFileHandler();
+ try (final TsFileReader tsFileReader = tsFileHandler.openReader())
{
+ for (int i = 0; i < devices.size(); i++) {
+ final Path path = new Path(devices.get(i), "s_0", true);
+ final QueryDataSet dataSet =
+ tsFileReader.query(
+
QueryExpression.create(Collections.singletonList(path), null));
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ counters.get(i).addAndGet(1);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ consumer.commitSync(messages);
+ for (int i = 0; i < devices.size(); i++) {
+ assertEquals(counters.get(i).get(), expected.get(i));
+ }
+ });
+ }
+
//////////////////////////// strict assertions ////////////////////////////
public static void assertEquals(int actual, int expected) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
index f99d88b8a29..3468a6e93ca 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.subscription.consumer.AckStrategy;
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
import org.apache.thrift.TException;
@@ -81,6 +82,15 @@ public class IoTDBDefaultTsfilePushConsumerIT extends
AbstractSubscriptionTreeRe
}
}
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
@@ -107,6 +117,7 @@ public class IoTDBDefaultTsfilePushConsumerIT extends
AbstractSubscriptionTreeRe
timestamp += 2000;
}
session_src.insertTablet(tablet);
+ session_src.executeNonQueryStatement("flush");
}
@Test
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
index d340363a41b..9ec58ef95f4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTreeRegressio
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
import org.apache.thrift.TException;
@@ -69,6 +70,15 @@ public class IoTDBRootPullConsumeTsfileIT extends
AbstractSubscriptionTreeRegres
session_src.executeNonQueryStatement("create database
root.RootPullConsumeTsfile");
}
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
index a9fecef2dc9..1ae3269f239 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTreeRegressio
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
import org.apache.thrift.TException;
@@ -83,6 +84,15 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends
AbstractSubscriptionTre
assertTrue(subs.getTopic(topicName2).isPresent(), "Create show topics 2");
}
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
index 2cad94fc4ac..c24326fed95 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
@RunWith(IoTDBTestRunner.class)
@@ -115,6 +116,7 @@ public class IoTDBDevicePatternPullConsumerDataSetIT
extends AbstractSubscriptio
timestamp += row * 2000;
}
session_src.insertTablet(tablet);
+ session_src.executeNonQueryStatement("flush");
}
@Test
@@ -132,13 +134,18 @@ public class IoTDBDevicePatternPullConsumerDataSetIT
extends AbstractSubscriptio
assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after
subscription");
insert_data(System.currentTimeMillis() - 30000L);
// Consumption data
- consume_data(consumer, session_dest);
String sql = "select count(s_0) from " + device;
- System.out.println("src: " + getCount(session_src, sql));
- check_count(8, sql, "Consumption data:" + pattern);
- check_count(8, "select count(s_1) from " + device, "Consumption data:
s_1");
- check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption
data:d_1");
- check_count(0, "select count(s_0) from " + device2, "Consumption
data:d_2");
+ consume_data_await(
+ consumer,
+ session_dest,
+ Collections.singletonList(
+ () -> {
+ System.out.println("src: " + getCount(session_src, sql));
+ check_count(8, sql, "Consumption data:" + pattern);
+ check_count(8, "select count(s_1) from " + device, "Consumption
data: s_1");
+ check_count(0, "select count(s_0) from " + database + ".d_1",
"Consumption data:d_1");
+ check_count(0, "select count(s_0) from " + device2, "Consumption
data:d_2");
+ }));
insert_data(System.currentTimeMillis());
// Unsubscribe
consumer.unsubscribe(topicName);
@@ -149,8 +156,14 @@ public class IoTDBDevicePatternPullConsumerDataSetIT
extends AbstractSubscriptio
System.out.println("src: " + getCount(session_src, sql));
// Consumption data: Progress is not retained after unsubscribing and then
re-subscribing. Full
// synchronization.
- consume_data(consumer, session_dest);
- check_count(12, "select count(s_0) from " + device, "consume data
again:s_0");
- check_count(12, "select count(s_1) from " + device, "Consumption data:
s_1");
+ consume_data_await(
+ consumer,
+ session_dest,
+ Collections.singletonList(
+ () -> {
+ System.out.println("src: " + getCount(session_src, sql));
+ check_count(12, "select count(s_0) from " + device, "consume
data again:s_0");
+ check_count(12, "select count(s_1) from " + device, "Consumption
data: s_1");
+ }));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
index 2572c41de93..49562e5314c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTreeRegressio
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
import org.apache.thrift.TException;
@@ -41,6 +42,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/***
@@ -94,6 +96,16 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT
assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics");
}
+ // TODO: remove it later
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
@@ -151,11 +163,7 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT
devices.add(device);
devices.add(device2);
devices.add(database2 + ".d_2");
-
- List<Integer> rowCounts = consume_tsfile(consumer, devices);
- assertEquals(rowCounts.get(0), 10);
- assertEquals(rowCounts.get(1), 1);
- assertEquals(rowCounts.get(2), 1);
+ consume_tsfile_await(consumer, devices, Arrays.asList(10, 1, 1));
// Unsubscribe
consumer.unsubscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after
cancellation");
@@ -165,14 +173,6 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT
insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
// Consumption data: Progress is not retained after canceling and
re-subscribing. Full
// synchronization.
- rowCounts = consume_tsfile(consumer, devices);
-
- assertEquals(
- rowCounts.get(0),
- 15,
- "Unsubscribe and resubscribe, progress is not retained. Full
synchronization.");
- assertEquals(
- rowCounts.get(1), 1, "Cancel subscription and subscribe again," +
database + ".d_1");
- assertEquals(rowCounts.get(2), 1, "Unsubscribe and resubscribe," +
database2 + ".d_2");
+ consume_tsfile_await(consumer, devices, Arrays.asList(15, 1, 1));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
index d003cbceaed..b90b46a3e2d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.session.subscription.consumer.AckStrategy;
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
import org.apache.thrift.TException;
@@ -100,6 +101,16 @@ public class IoTDBSnapshotTSPatternDatasetPushConsumerIT
assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics");
}
+ // TODO: remove it later
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
@@ -188,7 +199,8 @@ public class IoTDBSnapshotTSPatternDatasetPushConsumerIT
// Consumption data: Progress is not retained when re-subscribing after
cancellation. Full
// synchronization.
- AWAIT.untilAsserted(
+ IoTDBSubscriptionITConstant.AWAIT_WITH_FLUSH(
+ session_src,
() -> {
check_count(12, "select count(s_0) from " + device, "consume data
again:s_0 " + device);
check_count(0, "select count(s_1) from " + device, "Consumption
data: s_1 " + device);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
index 17fa08cd9e2..ecd515964d0 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
import org.apache.thrift.TException;
@@ -122,6 +123,15 @@ public class IoTDBMultiGroupVsMultiConsumerIT extends
AbstractSubscriptionTreeRe
subs.getTopics().forEach(System.out::println);
}
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {