This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d6c9bbc88c8 Subscription: refactor `IoTDBSubscriptionConsumerGroupIT`
to support multi-protocol pipe sync reference (#12288)
d6c9bbc88c8 is described below
commit d6c9bbc88c836e127b4b99c6b1e64e7d561259fd
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Apr 3 14:49:35 2024 +0800
Subscription: refactor `IoTDBSubscriptionConsumerGroupIT` to support
multi-protocol pipe sync reference (#12288)
---
.../org/apache/iotdb/db/it/utils/TestUtils.java | 5 +
.../it/dual/AbstractSubscriptionDualIT.java | 1 +
.../it/dual/IoTDBSubscriptionConsumerGroupIT.java | 725 ++++++++++++++-------
3 files changed, 504 insertions(+), 227 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index c7c72738d92..b9306937a96 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -59,6 +61,8 @@ import static org.junit.Assert.fail;
public class TestUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestUtils.class);
+
public static void prepareData(String[] sqls) {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
@@ -360,6 +364,7 @@ public class TestUtils {
}
String expected = new TreeMap<>(expectedHeaderWithResult).toString();
String actual = new TreeMap<>(actualHeaderWithResult).toString();
+ LOGGER.info("[AssertSingleResultSetEqual] expected {}, actual {}",
expected, actual);
assertEquals(expected, actual);
assertFalse(actualResultSet.next());
} catch (Exception e) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 6f2dbd41230..d8ee2ecaf24 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -38,6 +38,7 @@ abstract class AbstractSubscriptionDualIT {
senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+
receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true);
senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 54d3867c99c..d5f2d09f34b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -32,9 +33,11 @@ import
org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.SubscriptionSessionDataSet;
import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.awaitility.Awaitility;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -45,11 +48,16 @@ import java.sql.Connection;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import static org.junit.Assert.fail;
@@ -57,283 +65,555 @@ import static org.junit.Assert.fail;
@Category({MultiClusterIT2Subscription.class})
public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT {
+ // Test dimensions:
+ // 1. multi scenario of consumer, consumer group and subscribed topic
+ // 2. historical or realtime data
+ // 3. multi pipe sync protocol for reference
+
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionConsumerGroupIT.class);
- // --------------------------- //
- // Scenario 1: Historical Data //
- // --------------------------- //
+ private static Map<String, String> ASYNC_CONNECTOR_ATTRIBUTES;
+ private static Map<String, String> SYNC_CONNECTOR_ATTRIBUTES;
+ private static Map<String, String> LEGACY_CONNECTOR_ATTRIBUTES;
+ private static Map<String, String> AIR_GAP_CONNECTOR_ATTRIBUTES;
+
+ private static Pair<List<SubscriptionInfo>, Map<String, String>>
__3C_1CG_SUBSCRIBE_ONE_TOPIC;
+ private static Pair<List<SubscriptionInfo>, Map<String, String>>
__3C_3CG_SUBSCRIBE_ONE_TOPIC;
+ private static Pair<List<SubscriptionInfo>, Map<String, String>>
__3C_1CG_SUBSCRIBE_TWO_TOPIC;
+ private static Pair<List<SubscriptionInfo>, Map<String, String>>
__3C_3CG_SUBSCRIBE_TWO_TOPIC;
+ private static Pair<List<SubscriptionInfo>, Map<String, String>>
__4C_2CG_SUBSCRIBE_TWO_TOPIC;
+
+ static final class SubscriptionInfo {
+ final String consumerId;
+ final String consumerGroupId;
+ final Set<String> topicNames;
+
+ SubscriptionInfo(
+ final String consumerId, final String consumerGroupId, final
Set<String> topicNames) {
+ this.consumerId = consumerId;
+ this.consumerGroupId = consumerGroupId;
+ this.topicNames = topicNames;
+ }
+ }
- @Test
- public void test3C1CGSubscribeOneTopicHistoricalData() throws Exception {
+ @Before
+ public void setUp() {
+ super.setUp();
+
+ // Setup connector attributes
+ ASYNC_CONNECTOR_ATTRIBUTES = new HashMap<>();
+ ASYNC_CONNECTOR_ATTRIBUTES.put("connector",
"iotdb-thrift-async-connector");
+ ASYNC_CONNECTOR_ATTRIBUTES.put("connector.ip", receiverEnv.getIP());
+ ASYNC_CONNECTOR_ATTRIBUTES.put("connector.port", receiverEnv.getPort());
+
+ SYNC_CONNECTOR_ATTRIBUTES = new HashMap<>();
+ SYNC_CONNECTOR_ATTRIBUTES.put("connector", "iotdb-thrift-sync-connector");
+ SYNC_CONNECTOR_ATTRIBUTES.put("connector.ip", receiverEnv.getIP());
+ SYNC_CONNECTOR_ATTRIBUTES.put("connector.port", receiverEnv.getPort());
+
+ LEGACY_CONNECTOR_ATTRIBUTES = new HashMap<>();
+ LEGACY_CONNECTOR_ATTRIBUTES.put("connector",
"iotdb-legacy-pipe-connector");
+ LEGACY_CONNECTOR_ATTRIBUTES.put("connector.ip", receiverEnv.getIP());
+ LEGACY_CONNECTOR_ATTRIBUTES.put("connector.port", receiverEnv.getPort());
+
+ final StringBuilder nodeUrlsBuilder = new StringBuilder();
+ for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList())
{
+ // Use default port for convenience
+ nodeUrlsBuilder
+ .append(wrapper.getIp())
+ .append(":")
+ .append(wrapper.getPipeAirGapReceiverPort())
+ .append(",");
+ }
+ AIR_GAP_CONNECTOR_ATTRIBUTES = new HashMap<>();
+ AIR_GAP_CONNECTOR_ATTRIBUTES.put("connector", "iotdb-air-gap-connector");
+ AIR_GAP_CONNECTOR_ATTRIBUTES.put("connector.node-urls",
nodeUrlsBuilder.toString());
+
+ // Setup subscription info list with expected results
+ {
+ final List<SubscriptionInfo> subscriptionInfoList = new ArrayList<>();
+ subscriptionInfoList.add(new SubscriptionInfo("c1", "cg1",
Collections.singleton("topic1")));
+ subscriptionInfoList.add(new SubscriptionInfo("c2", "cg1",
Collections.singleton("topic1")));
+ subscriptionInfoList.add(new SubscriptionInfo("c3", "cg1",
Collections.singleton("topic1")));
+
+ final Map<String, String> expectedHeaderWithResult = new HashMap<>();
+ expectedHeaderWithResult.put("count(root.cg1.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.topic2.s)", "100");
+
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC = new Pair<>(subscriptionInfoList,
expectedHeaderWithResult);
+ }
+ {
+ final List<SubscriptionInfo> subscriptionInfoList = new ArrayList<>();
+ subscriptionInfoList.add(new SubscriptionInfo("c1", "cg1",
Collections.singleton("topic1")));
+ subscriptionInfoList.add(new SubscriptionInfo("c2", "cg2",
Collections.singleton("topic1")));
+ subscriptionInfoList.add(new SubscriptionInfo("c3", "cg3",
Collections.singleton("topic1")));
+
+ final Map<String, String> expectedHeaderWithResult = new HashMap<>();
+ expectedHeaderWithResult.put("count(root.cg1.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.cg2.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.cg3.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.topic2.s)", "100");
+
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC = new Pair<>(subscriptionInfoList,
expectedHeaderWithResult);
+ }
+ {
+ final List<SubscriptionInfo> subscriptionInfoList = new ArrayList<>();
+ subscriptionInfoList.add(new SubscriptionInfo("c1", "cg1",
Collections.singleton("topic1")));
+ subscriptionInfoList.add(
+ new SubscriptionInfo("c2", "cg1", new
HashSet<>(Arrays.asList("topic1", "topic2"))));
+ subscriptionInfoList.add(new SubscriptionInfo("c3", "cg1",
Collections.singleton("topic2")));
+
+ final Map<String, String> expectedHeaderWithResult = new HashMap<>();
+ expectedHeaderWithResult.put("count(root.cg1.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.cg1.topic2.s)", "100");
+ expectedHeaderWithResult.put("count(root.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.topic2.s)", "100");
+
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC = new Pair<>(subscriptionInfoList,
expectedHeaderWithResult);
+ }
+ {
+ final List<SubscriptionInfo> subscriptionInfoList = new ArrayList<>();
+ subscriptionInfoList.add(new SubscriptionInfo("c1", "cg1",
Collections.singleton("topic1")));
+ subscriptionInfoList.add(
+ new SubscriptionInfo("c2", "cg2", new
HashSet<>(Arrays.asList("topic1", "topic2"))));
+ subscriptionInfoList.add(new SubscriptionInfo("c3", "cg3",
Collections.singleton("topic2")));
+
+ final Map<String, String> expectedHeaderWithResult = new HashMap<>();
+ expectedHeaderWithResult.put("count(root.cg1.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.cg2.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.cg2.topic2.s)", "100");
+ expectedHeaderWithResult.put("count(root.cg3.topic2.s)", "100");
+ expectedHeaderWithResult.put("count(root.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.topic2.s)", "100");
+
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC = new Pair<>(subscriptionInfoList,
expectedHeaderWithResult);
+ }
+ {
+ final List<SubscriptionInfo> subscriptionInfoList = new ArrayList<>();
+ subscriptionInfoList.add(new SubscriptionInfo("c1", "cg1",
Collections.singleton("topic1")));
+ subscriptionInfoList.add(
+ new SubscriptionInfo("c2", "cg2", new
HashSet<>(Arrays.asList("topic1", "topic2"))));
+ subscriptionInfoList.add(new SubscriptionInfo("c3", "cg1",
Collections.singleton("topic1")));
+ subscriptionInfoList.add(new SubscriptionInfo("c4", "cg2",
Collections.singleton("topic2")));
+
+ final Map<String, String> expectedHeaderWithResult = new HashMap<>();
+ expectedHeaderWithResult.put("count(root.cg1.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.cg2.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.cg2.topic2.s)", "100");
+ expectedHeaderWithResult.put("count(root.topic1.s)", "100");
+ expectedHeaderWithResult.put("count(root.topic2.s)", "100");
+
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC = new Pair<>(subscriptionInfoList,
expectedHeaderWithResult);
+ }
+ }
+
+ private void testSubscriptionHistoricalDataTemplate(
+ final Map<String, String> connectorAttributes,
+ final List<SubscriptionInfo> subscriptionInfoList,
+ final Map<String, String> expectedHeaderWithResult)
+ throws Exception {
final long currentTime = System.currentTimeMillis();
- // Historical data
+ // Insert some historical data
insertData(currentTime);
+ // Create topic 'topic1' and 'topic2'
createTopics(currentTime);
- createPipes(currentTime);
- final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
- consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
+ // Create pipe 'sync_topic1' and 'sync_topic2' with given connector
attributes
+ createPipes(currentTime, connectorAttributes);
+
+ // Create subscription and check result
pollMessagesAndCheck(
- consumers,
- new HashMap<String, String>() {
- {
- put("count(root.cg1.topic1.s)", "100");
- put("count(root.topic1.s)", "100");
- put("count(root.topic2.s)", "100");
- }
- });
+ subscriptionInfoList.stream()
+ .map(
+ (info) -> {
+ try {
+ return createConsumerAndSubscribeTopics(info);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ return null;
+ }
+ })
+ .collect(Collectors.toList()),
+ expectedHeaderWithResult);
}
- @Test
- public void test3C3CGSubscribeOneTopicHistoricalData() throws Exception {
+ private void testSubscriptionRealtimeDataTemplate(
+ final Map<String, String> connectorAttributes,
+ final List<SubscriptionInfo> subscriptionInfoList,
+ final Map<String, String> expectedHeaderWithResult)
+ throws Exception {
final long currentTime = System.currentTimeMillis();
- // Historical data
- insertData(currentTime);
-
+ // Create topic 'topic1' and 'topic2'
createTopics(currentTime);
- createPipes(currentTime);
- final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
- consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic1"));
+ // Create pipe 'sync_topic1' and 'sync_topic2' with given connector
attributes
+ createPipes(currentTime, connectorAttributes);
+
+ // Insert some realtime data
+ insertData(currentTime);
+
+ // Create subscription and check result
pollMessagesAndCheck(
- consumers,
- new HashMap<String, String>() {
- {
- put("count(root.cg1.topic1.s)", "100");
- put("count(root.cg2.topic1.s)", "100");
- put("count(root.cg3.topic1.s)", "100");
- put("count(root.topic1.s)", "100");
- put("count(root.topic2.s)", "100");
- }
- });
+ subscriptionInfoList.stream()
+ .map(
+ (info) -> {
+ try {
+ return createConsumerAndSubscribeTopics(info);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ return null;
+ }
+ })
+ .collect(Collectors.toList()),
+ expectedHeaderWithResult);
}
+ // -------------------------------------- //
+ // 3 consumers, 1 consumer group, 1 topic //
+ // -------------------------------------- //
+
@Test
- public void test3C1CGSubscribeTwoTopicHistoricalData() throws Exception {
- final long currentTime = System.currentTimeMillis();
+ public void test3C1CGSubscribeOneTopicHistoricalDataWithAsyncConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ ASYNC_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- // Historical data
- insertData(currentTime);
+ @Test
+ public void test3C1CGSubscribeOneTopicHistoricalDataWithSyncConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ SYNC_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- createTopics(currentTime);
- createPipes(currentTime);
- final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
- consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1",
"topic2"));
- consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic2"));
+ @Test
+ public void test3C1CGSubscribeOneTopicHistoricalDataWithLegacyConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ LEGACY_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- pollMessagesAndCheck(
- consumers,
- new HashMap<String, String>() {
- {
- put("count(root.cg1.topic1.s)", "100");
- put("count(root.cg1.topic2.s)", "100");
- put("count(root.topic1.s)", "100");
- put("count(root.topic2.s)", "100");
- }
- });
+ @Test
+ public void test3C1CGSubscribeOneTopicHistoricalDataWithAirGapConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ AIR_GAP_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
}
@Test
- public void test3C3CGSubscribeTwoTopicHistoricalData() throws Exception {
- final long currentTime = System.currentTimeMillis();
+ public void test3C1CGSubscribeOneTopicRealtimeDataWithAsyncConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ ASYNC_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- // Historical data
- insertData(currentTime);
+ @Test
+ public void test3C1CGSubscribeOneTopicRealtimeDataWithSyncConnector() throws
Exception {
+ testSubscriptionRealtimeDataTemplate(
+ SYNC_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- createTopics(currentTime);
- createPipes(currentTime);
- final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
- consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1",
"topic2"));
- consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic2"));
+ @Test
+ public void test3C1CGSubscribeOneTopicRealtimeDataWithLegacyConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ LEGACY_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- pollMessagesAndCheck(
- consumers,
- new HashMap<String, String>() {
- {
- put("count(root.cg1.topic1.s)", "100");
- put("count(root.cg2.topic1.s)", "100");
- put("count(root.cg2.topic2.s)", "100");
- put("count(root.cg3.topic2.s)", "100");
- put("count(root.topic1.s)", "100");
- put("count(root.topic2.s)", "100");
- }
- });
+ @Test
+ public void test3C1CGSubscribeOneTopicRealtimeDataWithAirGapConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ AIR_GAP_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
}
+ // --------------------------------------- //
+ // 3 consumers, 3 consumer groups, 1 topic //
+ // --------------------------------------- //
+
@Test
- public void test4C2CGSubscribeTwoTopicHistoricalData() throws Exception {
- final long currentTime = System.currentTimeMillis();
+ public void test3C3CGSubscribeOneTopicHistoricalDataWithAsyncConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ ASYNC_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- // Historical data
- insertData(currentTime);
+ @Test
+ public void test3C3CGSubscribeOneTopicHistoricalDataWithSyncConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ SYNC_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- createTopics(currentTime);
- createPipes(currentTime);
- final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
- consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1",
"topic2"));
- consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c4", "cg2", "topic2"));
+ @Test
+ public void test3C3CGSubscribeOneTopicHistoricalDataWithLegacyConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ LEGACY_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- pollMessagesAndCheck(
- consumers,
- new HashMap<String, String>() {
- {
- put("count(root.cg1.topic1.s)", "100");
- put("count(root.cg2.topic1.s)", "100");
- put("count(root.cg2.topic2.s)", "100");
- put("count(root.topic1.s)", "100");
- put("count(root.topic2.s)", "100");
- }
- });
+ @Test
+ public void test3C3CGSubscribeOneTopicHistoricalDataWithAirGapConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ AIR_GAP_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
}
- // --------------------------- //
- // Scenario 2: Realtime Data //
- // --------------------------- //
+ @Test
+ public void test3C3CGSubscribeOneTopicRealtimeDataWithAsyncConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ ASYNC_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
@Test
- public void test3C1CGSubscribeOneTopicRealtimeData() throws Exception {
- final long currentTime = System.currentTimeMillis();
+ public void test3C3CGSubscribeOneTopicRealtimeDataWithSyncConnector() throws
Exception {
+ testSubscriptionRealtimeDataTemplate(
+ SYNC_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- createTopics(currentTime);
- createPipes(currentTime);
- final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
- consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
+ @Test
+ public void test3C3CGSubscribeOneTopicRealtimeDataWithLegacyConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ LEGACY_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- // Realtime data
- insertData(currentTime);
+ @Test
+ public void test3C3CGSubscribeOneTopicRealtimeDataWithAirGapConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ AIR_GAP_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+ }
- pollMessagesAndCheck(
- consumers,
- new HashMap<String, String>() {
- {
- put("count(root.cg1.topic1.s)", "100");
- put("count(root.topic1.s)", "100");
- put("count(root.topic2.s)", "100");
- }
- });
+ // --------------------------------------- //
+ // 3 consumers, 1 consumer group, 2 topics //
+ // --------------------------------------- //
+
+ @Test
+ public void test3C1CGSubscribeTwoTopicHistoricalDataWithAsyncConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ ASYNC_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
}
@Test
- public void test3C3CGSubscribeOneTopicRealtimeData() throws Exception {
- final long currentTime = System.currentTimeMillis();
+ public void test3C1CGSubscribeTwoTopicHistoricalDataWithSyncConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ SYNC_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
- createTopics(currentTime);
- createPipes(currentTime);
- final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
- consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic1"));
+ @Test
+ public void test3C1CGSubscribeTwoTopicHistoricalDataWithLegacyConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ LEGACY_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
- // Realtime data
- insertData(currentTime);
+ @Test
+ public void test3C1CGSubscribeTwoTopicHistoricalDataWithAirGapConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ AIR_GAP_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
- pollMessagesAndCheck(
- consumers,
- new HashMap<String, String>() {
- {
- put("count(root.cg1.topic1.s)", "100");
- put("count(root.cg2.topic1.s)", "100");
- put("count(root.cg3.topic1.s)", "100");
- put("count(root.topic1.s)", "100");
- put("count(root.topic2.s)", "100");
- }
- });
+ @Test
+ public void test3C1CGSubscribeTwoTopicRealtimeDataWithAsyncConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ ASYNC_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
}
@Test
- public void test3C1CGSubscribeTwoTopicRealtimeData() throws Exception {
- final long currentTime = System.currentTimeMillis();
+ public void test3C1CGSubscribeTwoTopicRealtimeDataWithSyncConnector() throws
Exception {
+ testSubscriptionRealtimeDataTemplate(
+ SYNC_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
- createTopics(currentTime);
- createPipes(currentTime);
- final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
- consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1",
"topic2"));
- consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic2"));
+ @Test
+ public void test3C1CGSubscribeTwoTopicRealtimeDataWithLegacyConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ LEGACY_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
- // Realtime data
- insertData(currentTime);
+ @Test
+ public void test3C1CGSubscribeTwoTopicRealtimeDataWithAirGapConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ AIR_GAP_CONNECTOR_ATTRIBUTES,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
- pollMessagesAndCheck(
- consumers,
- new HashMap<String, String>() {
- {
- put("count(root.cg1.topic1.s)", "100");
- put("count(root.cg1.topic2.s)", "100");
- put("count(root.topic1.s)", "100");
- put("count(root.topic2.s)", "100");
- }
- });
+ // ---------------------------------------- //
+ // 3 consumers, 3 consumer groups, 2 topics //
+ // ---------------------------------------- //
+
+ @Test
+ public void test3C3CGSubscribeTwoTopicHistoricalDataWithAsyncConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ ASYNC_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
}
@Test
- public void test3C3CGSubscribeTwoTopicRealtimeData() throws Exception {
- final long currentTime = System.currentTimeMillis();
+ public void test3C3CGSubscribeTwoTopicHistoricalDataWithSyncConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ SYNC_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
- createTopics(currentTime);
- createPipes(currentTime);
- final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
- consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1",
"topic2"));
- consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic2"));
+ @Test
+ public void test3C3CGSubscribeTwoTopicHistoricalDataWithLegacyConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ LEGACY_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
- // Realtime data
- insertData(currentTime);
+ @Test
+ public void test3C3CGSubscribeTwoTopicHistoricalDataWithAirGapConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ AIR_GAP_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
- pollMessagesAndCheck(
- consumers,
- new HashMap<String, String>() {
- {
- put("count(root.cg1.topic1.s)", "100");
- put("count(root.cg2.topic1.s)", "100");
- put("count(root.cg2.topic2.s)", "100");
- put("count(root.cg3.topic2.s)", "100");
- put("count(root.topic1.s)", "100");
- put("count(root.topic2.s)", "100");
- }
- });
+ @Test
+ public void test3C3CGSubscribeTwoTopicRealtimeDataWithAsyncConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ ASYNC_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
}
@Test
- public void test4C2CGSubscribeTwoTopicRealtimeData() throws Exception {
- final long currentTime = System.currentTimeMillis();
+ public void test3C3CGSubscribeTwoTopicRealtimeDataWithSyncConnector() throws
Exception {
+ testSubscriptionRealtimeDataTemplate(
+ SYNC_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
- createTopics(currentTime);
- createPipes(currentTime);
- final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
- consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1",
"topic2"));
- consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
- consumers.add(createConsumerAndSubscribeTopics("c4", "cg2", "topic2"));
-
- // Realtime data
- insertData(currentTime);
+ @Test
+ public void test3C3CGSubscribeTwoTopicRealtimeDataWithLegacyConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ LEGACY_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
- pollMessagesAndCheck(
- consumers,
- new HashMap<String, String>() {
- {
- put("count(root.cg1.topic1.s)", "100");
- put("count(root.cg2.topic1.s)", "100");
- put("count(root.cg2.topic2.s)", "100");
- put("count(root.topic1.s)", "100");
- put("count(root.topic2.s)", "100");
- }
- });
+ @Test
+ public void test3C3CGSubscribeTwoTopicRealtimeDataWithAirGapConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ AIR_GAP_CONNECTOR_ATTRIBUTES,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+ __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
+
+ // ---------------------------------------- //
+ // 4 consumers, 2 consumer groups, 2 topics //
+ // ---------------------------------------- //
+
+ @Test
+ public void test4C2CGSubscribeTwoTopicHistoricalDataWithAsyncConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ ASYNC_CONNECTOR_ATTRIBUTES,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
+
+ @Test
+ public void test4C2CGSubscribeTwoTopicHistoricalDataWithSyncConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ SYNC_CONNECTOR_ATTRIBUTES,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
+
+ @Test
+ public void test4C2CGSubscribeTwoTopicHistoricalDataWithLegacyConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ LEGACY_CONNECTOR_ATTRIBUTES,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
+
+ @Test
+ public void test4C2CGSubscribeTwoTopicHistoricalDataWithAirGapConnector()
throws Exception {
+ testSubscriptionHistoricalDataTemplate(
+ AIR_GAP_CONNECTOR_ATTRIBUTES,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
+
+ @Test
+ public void test4C2CGSubscribeTwoTopicRealtimeDataWithAsyncConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ ASYNC_CONNECTOR_ATTRIBUTES,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
+
+ @Test
+ public void test4C2CGSubscribeTwoTopicRealtimeDataWithSyncConnector() throws
Exception {
+ testSubscriptionRealtimeDataTemplate(
+ SYNC_CONNECTOR_ATTRIBUTES,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
+
+ @Test
+ public void test4C2CGSubscribeTwoTopicRealtimeDataWithLegacyConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ LEGACY_CONNECTOR_ATTRIBUTES,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+ }
+
+ @Test
+ public void test4C2CGSubscribeTwoTopicRealtimeDataWithAirGapConnector()
throws Exception {
+ testSubscriptionRealtimeDataTemplate(
+ AIR_GAP_CONNECTOR_ATTRIBUTES,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+ __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
}
/////////////////////////////// utility ///////////////////////////////
@@ -368,17 +648,12 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
}
}
- private void createPipes(final long currentTime) {
+ private void createPipes(final long currentTime, final Map<String, String>
connectorAttributes) {
// For sync reference
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
-
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.ip", receiverEnv.getIP());
- connectorAttributes.put("connector.port", receiverEnv.getPort());
extractorAttributes.put("inclusion", "data.insert");
extractorAttributes.put("inclusion.exclusion", "data.delete");
@@ -399,11 +674,6 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
-
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.ip", receiverEnv.getIP());
- connectorAttributes.put("connector.port", receiverEnv.getPort());
extractorAttributes.put("inclusion", "data.insert");
extractorAttributes.put("inclusion.exclusion", "data.delete");
@@ -422,18 +692,17 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
}
private SubscriptionPullConsumer createConsumerAndSubscribeTopics(
- final String consumerId, final String consumerGroupId, final String...
topicNames)
- throws Exception {
+ final SubscriptionInfo subscriptionInfo) throws Exception {
final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(senderEnv.getIP())
.port(Integer.parseInt(senderEnv.getPort()))
- .consumerId(consumerId)
- .consumerGroupId(consumerGroupId)
+ .consumerId(subscriptionInfo.consumerId)
+ .consumerGroupId(subscriptionInfo.consumerGroupId)
.autoCommit(false)
.buildPullConsumer();
consumer.open();
- consumer.subscribe(topicNames);
+ consumer.subscribe(subscriptionInfo.topicNames);
return consumer;
}
@@ -540,12 +809,14 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
String.format(
"insert into root.%s.topic1(time, s) values (%s, 1)",
consumerGroupId, record.getTimestamp());
+ LOGGER.info(sql);
return TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, sql);
} else if ("root.topic2.s".equals(columnName)) {
final String sql =
String.format(
"insert into root.%s.topic2(time, s) values (%s, 1)",
consumerGroupId, record.getTimestamp());
+ LOGGER.info(sql);
return TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, sql);
} else {
LOGGER.warn("unexpected column name: {}", columnName);