This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 7bfea0038a0 [To dev/1.3] Subscription: apply IoTConsensusV2 as cluster
mode for integration test (#15546) (#15852)
7bfea0038a0 is described below
commit 7bfea0038a09e4b00fce47bd381756790a70bce2
Author: VGalaxies <[email protected]>
AuthorDate: Thu Jul 3 15:20:37 2025 +0800
[To dev/1.3] Subscription: apply IoTConsensusV2 as cluster mode for
integration test (#15546) (#15852)
* Subscription: apply IoTConsensusV2 as cluster mode for integration test
(#15546)
* fixup! Subscription: apply IoTConsensusV2 as cluster mode for integration
test (#15546)
---
.../it/IoTDBSubscriptionITConstant.java | 28 ++++++++
.../AbstractSubscriptionRegressionIT.java | 80 ++++++++++++++++------
.../IoTDBDefaultTsfilePushConsumerIT.java | 11 +++
.../IoTDBRootPullConsumeTsfileIT.java | 10 +++
.../multi/IoTDBOneConsumerMultiTopicsTsfileIT.java | 10 +++
.../IoTDBDevicePatternPullConsumerDataSetIT.java | 31 ++++++---
...IoTDBMiddleMatchPatternPullConsumeTsfileIT.java | 28 ++++----
...oTDBSnapshotTSPatternDatasetPushConsumerIT.java | 14 +++-
.../multi/IoTDBMultiGroupVsMultiConsumerIT.java | 10 +++
9 files changed, 178 insertions(+), 44 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
index 5b8ec393274..3162139fb66 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
@@ -19,10 +19,15 @@
package org.apache.iotdb.subscription.it;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.session.Session;
+
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
public class IoTDBSubscriptionITConstant {
@@ -40,4 +45,27 @@ public class IoTDBSubscriptionITConstant {
public static final long SLEEP_NS = 1_000_000_000L;
public static final long POLL_TIMEOUT_MS = 10_000L;
+
+ @FunctionalInterface
+ public interface WrappedVoidSupplier {
+ void get() throws Throwable;
+ }
+
+ public static void AWAIT_WITH_FLUSH(final Session session, final
WrappedVoidSupplier assertions) {
+ AWAIT.untilAsserted(
+ () -> {
+ session.executeNonQueryStatement("flush");
+ assertions.get();
+ });
+ }
+
+ public static Consumer<BaseEnv> FORCE_SCALABLE_SINGLE_NODE_MODE =
+ env ->
+ env.getConfig()
+ .getCommonConfig()
+
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setSchemaReplicationFactor(1)
+ .setDataReplicationFactor(1);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
index fe32817ad00..0eb5798599d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
+import
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.WrappedVoidSupplier;
import org.apache.iotdb.subscription.it.triple.AbstractSubscriptionTripleIT;
import org.apache.thrift.TException;
@@ -57,6 +58,7 @@ import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS;
public abstract class AbstractSubscriptionRegressionIT extends
AbstractSubscriptionTripleIT {
@@ -359,26 +361,6 @@ public abstract class AbstractSubscriptionRegressionIT
extends AbstractSubscript
return results;
}
- public static void consume_data_long(
- SubscriptionPullConsumer consumer, Session session, Long timeout)
- throws StatementExecutionException, InterruptedException,
IoTDBConnectionException {
- timeout = System.currentTimeMillis() + timeout;
- while (System.currentTimeMillis() < timeout) {
- List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
- if (messages.isEmpty()) {
- Thread.sleep(1000);
- }
- for (final SubscriptionMessage message : messages) {
- for (final Iterator<Tablet> it =
message.getSessionDataSetsHandler().tabletIterator();
- it.hasNext(); ) {
- final Tablet tablet = it.next();
- session.insertTablet(tablet);
- }
- }
- consumer.commitSync(messages);
- }
- }
-
public void consume_data(SubscriptionPullConsumer consumer)
throws TException,
IOException,
@@ -388,6 +370,64 @@ public abstract class AbstractSubscriptionRegressionIT
extends AbstractSubscript
consume_data(consumer, session_dest);
}
+ public void consume_data_await(
+ SubscriptionPullConsumer consumer, Session session,
List<WrappedVoidSupplier> assertions) {
+ AWAIT.untilAsserted(
+ () -> {
+ List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+ if (messages.isEmpty()) {
+ session_src.executeNonQueryStatement("flush");
+ }
+ for (final SubscriptionMessage message : messages) {
+ for (final Iterator<Tablet> it =
message.getSessionDataSetsHandler().tabletIterator();
+ it.hasNext(); ) {
+ final Tablet tablet = it.next();
+ session.insertTablet(tablet);
+ }
+ }
+ consumer.commitSync(messages);
+ for (final WrappedVoidSupplier assertion : assertions) {
+ assertion.get();
+ }
+ });
+ }
+
+ public void consume_tsfile_await(
+ SubscriptionPullConsumer consumer, List<String> devices, List<Integer>
expected) {
+ final List<AtomicInteger> counters = new ArrayList<>(devices.size());
+ for (int i = 0; i < devices.size(); i++) {
+ counters.add(new AtomicInteger(0));
+ }
+ AWAIT.untilAsserted(
+ () -> {
+ List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+ if (messages.isEmpty()) {
+ session_src.executeNonQueryStatement("flush");
+ }
+ for (final SubscriptionMessage message : messages) {
+ final SubscriptionTsFileHandler tsFileHandler =
message.getTsFileHandler();
+ try (final TsFileReader tsFileReader = tsFileHandler.openReader())
{
+ for (int i = 0; i < devices.size(); i++) {
+ final Path path = new Path(devices.get(i), "s_0", true);
+ final QueryDataSet dataSet =
+ tsFileReader.query(
+
QueryExpression.create(Collections.singletonList(path), null));
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ counters.get(i).addAndGet(1);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ consumer.commitSync(messages);
+ for (int i = 0; i < devices.size(); i++) {
+ assertEquals(counters.get(i).get(), expected.get(i));
+ }
+ });
+ }
+
//////////////////////////// strict assertions ////////////////////////////
public static void assertEquals(int actual, int expected) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
index e56e8cef24d..ac3ee07bd7f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.subscription.consumer.AckStrategy;
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
import org.apache.thrift.TException;
@@ -80,6 +81,15 @@ public class IoTDBDefaultTsfilePushConsumerIT extends
AbstractSubscriptionRegres
}
}
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
@@ -106,6 +116,7 @@ public class IoTDBDefaultTsfilePushConsumerIT extends
AbstractSubscriptionRegres
timestamp += 2000;
}
session_src.insertTablet(tablet);
+ session_src.executeNonQueryStatement("flush");
}
@Test
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
index f513e66ea2b..6106a392c66 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionMis
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
import org.apache.thrift.TException;
@@ -68,6 +69,15 @@ public class IoTDBRootPullConsumeTsfileIT extends
AbstractSubscriptionRegression
session_src.executeNonQueryStatement("create database
root.RootPullConsumeTsfile");
}
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
index 8ae086e6151..a28f617f32d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionCon
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
import org.apache.thrift.TException;
@@ -82,6 +83,15 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends
AbstractSubscriptionReg
assertTrue(subs.getTopic(topicName2).isPresent(), "Create show topics 2");
}
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
index 1e6a59a7ace..3dbb923baed 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
@RunWith(IoTDBTestRunner.class)
@@ -114,6 +115,7 @@ public class IoTDBDevicePatternPullConsumerDataSetIT
extends AbstractSubscriptio
timestamp += row * 2000;
}
session_src.insertTablet(tablet);
+ session_src.executeNonQueryStatement("flush");
}
@Test
@@ -131,13 +133,18 @@ public class IoTDBDevicePatternPullConsumerDataSetIT
extends AbstractSubscriptio
assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after
subscription");
insert_data(System.currentTimeMillis() - 30000L);
// Consumption data
- consume_data(consumer, session_dest);
String sql = "select count(s_0) from " + device;
- System.out.println("src: " + getCount(session_src, sql));
- check_count(8, sql, "Consumption data:" + pattern);
- check_count(8, "select count(s_1) from " + device, "Consumption data:
s_1");
- check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption
data:d_1");
- check_count(0, "select count(s_0) from " + device2, "Consumption
data:d_2");
+ consume_data_await(
+ consumer,
+ session_dest,
+ Collections.singletonList(
+ () -> {
+ System.out.println("src: " + getCount(session_src, sql));
+ check_count(8, sql, "Consumption data:" + pattern);
+ check_count(8, "select count(s_1) from " + device, "Consumption
data: s_1");
+ check_count(0, "select count(s_0) from " + database + ".d_1",
"Consumption data:d_1");
+ check_count(0, "select count(s_0) from " + device2, "Consumption
data:d_2");
+ }));
insert_data(System.currentTimeMillis());
// Unsubscribe
consumer.unsubscribe(topicName);
@@ -148,8 +155,14 @@ public class IoTDBDevicePatternPullConsumerDataSetIT
extends AbstractSubscriptio
System.out.println("src: " + getCount(session_src, sql));
// Consumption data: Progress is not retained after unsubscribing and then
re-subscribing. Full
// synchronization.
- consume_data(consumer, session_dest);
- check_count(12, "select count(s_0) from " + device, "consume data
again:s_0");
- check_count(12, "select count(s_1) from " + device, "Consumption data:
s_1");
+ consume_data_await(
+ consumer,
+ session_dest,
+ Collections.singletonList(
+ () -> {
+ System.out.println("src: " + getCount(session_src, sql));
+ check_count(12, "select count(s_0) from " + device, "consume
data again:s_0");
+ check_count(12, "select count(s_1) from " + device, "Consumption
data: s_1");
+ }));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
index 291f666b9ea..e667d8212c2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionCon
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
import org.apache.thrift.TException;
@@ -40,6 +41,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/***
@@ -92,6 +94,16 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT
extends AbstractSubscrip
assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics");
}
+ // TODO: remove it later
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
@@ -149,11 +161,7 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT
extends AbstractSubscrip
devices.add(device);
devices.add(device2);
devices.add(database2 + ".d_2");
-
- List<Integer> rowCounts = consume_tsfile(consumer, devices);
- assertEquals(rowCounts.get(0), 10);
- assertEquals(rowCounts.get(1), 1);
- assertEquals(rowCounts.get(2), 1);
+ consume_tsfile_await(consumer, devices, Arrays.asList(10, 1, 1));
// Unsubscribe
consumer.unsubscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after
cancellation");
@@ -163,14 +171,6 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT
extends AbstractSubscrip
insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
// Consumption data: Progress is not retained after canceling and
re-subscribing. Full
// synchronization.
- rowCounts = consume_tsfile(consumer, devices);
-
- assertEquals(
- rowCounts.get(0),
- 15,
- "Unsubscribe and resubscribe, progress is not retained. Full
synchronization.");
- assertEquals(
- rowCounts.get(1), 1, "Cancel subscription and subscribe again," +
database + ".d_1");
- assertEquals(rowCounts.get(2), 1, "Unsubscribe and resubscribe," +
database2 + ".d_2");
+ consume_tsfile_await(consumer, devices, Arrays.asList(15, 1, 1));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
index ea4bc9d811f..f19f0594537 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.session.subscription.consumer.AckStrategy;
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import org.apache.iotdb.subscription.it.Retry;
import org.apache.iotdb.subscription.it.RetryRule;
import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
@@ -104,6 +105,16 @@ public class IoTDBSnapshotTSPatternDatasetPushConsumerIT
extends AbstractSubscri
assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics");
}
+ // TODO: remove it later
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
@@ -194,7 +205,8 @@ public class IoTDBSnapshotTSPatternDatasetPushConsumerIT
extends AbstractSubscri
// Consumption data: Progress is not retained when re-subscribing after
cancellation. Full
// synchronization.
- AWAIT.untilAsserted(
+ IoTDBSubscriptionITConstant.AWAIT_WITH_FLUSH(
+ session_src,
() -> {
check_count(12, "select count(s_0) from " + device, "consume data
again:s_0 " + device);
check_count(0, "select count(s_1) from " + device, "Consumption
data: s_1 " + device);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
index 0751e922820..af39d745933 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
import org.apache.thrift.TException;
@@ -121,6 +122,15 @@ public class IoTDBMultiGroupVsMultiConsumerIT extends
AbstractSubscriptionRegres
subs.getTopics().forEach(System.out::println);
}
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {