This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d6c9bbc88c8 Subscription: refactor `IoTDBSubscriptionConsumerGroupIT` 
to support multi-protocol pipe sync reference (#12288)
d6c9bbc88c8 is described below

commit d6c9bbc88c836e127b4b99c6b1e64e7d561259fd
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Apr 3 14:49:35 2024 +0800

    Subscription: refactor `IoTDBSubscriptionConsumerGroupIT` to support 
multi-protocol pipe sync reference (#12288)
---
 .../org/apache/iotdb/db/it/utils/TestUtils.java    |   5 +
 .../it/dual/AbstractSubscriptionDualIT.java        |   1 +
 .../it/dual/IoTDBSubscriptionConsumerGroupIT.java  | 725 ++++++++++++++-------
 3 files changed, 504 insertions(+), 227 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index c7c72738d92..b9306937a96 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -59,6 +61,8 @@ import static org.junit.Assert.fail;
 
 public class TestUtils {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TestUtils.class);
+
   public static void prepareData(String[] sqls) {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
@@ -360,6 +364,7 @@ public class TestUtils {
       }
       String expected = new TreeMap<>(expectedHeaderWithResult).toString();
       String actual = new TreeMap<>(actualHeaderWithResult).toString();
+      LOGGER.info("[AssertSingleResultSetEqual] expected {}, actual {}", 
expected, actual);
       assertEquals(expected, actual);
       assertFalse(actualResultSet.next());
     } catch (Exception e) {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 6f2dbd41230..d8ee2ecaf24 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -38,6 +38,7 @@ abstract class AbstractSubscriptionDualIT {
 
     senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
     receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+    
receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true);
 
     senderEnv.initClusterEnvironment();
     receiverEnv.initClusterEnvironment();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 54d3867c99c..d5f2d09f34b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -32,9 +33,11 @@ import 
org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.SubscriptionSessionDataSet;
 import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.awaitility.Awaitility;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -45,11 +48,16 @@ import java.sql.Connection;
 import java.sql.Statement;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.fail;
 
@@ -57,283 +65,555 @@ import static org.junit.Assert.fail;
 @Category({MultiClusterIT2Subscription.class})
 public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT {
 
+  // Test dimensions:
+  // 1. multi scenario of consumer, consumer group and subscribed topic
+  // 2. historical or realtime data
+  // 3. multi pipe sync protocol for reference
+
   private static final Logger LOGGER =
       LoggerFactory.getLogger(IoTDBSubscriptionConsumerGroupIT.class);
 
-  // --------------------------- //
-  // Scenario 1: Historical Data //
-  // --------------------------- //
+  private static Map<String, String> ASYNC_CONNECTOR_ATTRIBUTES;
+  private static Map<String, String> SYNC_CONNECTOR_ATTRIBUTES;
+  private static Map<String, String> LEGACY_CONNECTOR_ATTRIBUTES;
+  private static Map<String, String> AIR_GAP_CONNECTOR_ATTRIBUTES;
+
+  private static Pair<List<SubscriptionInfo>, Map<String, String>> 
__3C_1CG_SUBSCRIBE_ONE_TOPIC;
+  private static Pair<List<SubscriptionInfo>, Map<String, String>> 
__3C_3CG_SUBSCRIBE_ONE_TOPIC;
+  private static Pair<List<SubscriptionInfo>, Map<String, String>> 
__3C_1CG_SUBSCRIBE_TWO_TOPIC;
+  private static Pair<List<SubscriptionInfo>, Map<String, String>> 
__3C_3CG_SUBSCRIBE_TWO_TOPIC;
+  private static Pair<List<SubscriptionInfo>, Map<String, String>> 
__4C_2CG_SUBSCRIBE_TWO_TOPIC;
+
+  static final class SubscriptionInfo {
+    final String consumerId;
+    final String consumerGroupId;
+    final Set<String> topicNames;
+
+    SubscriptionInfo(
+        final String consumerId, final String consumerGroupId, final 
Set<String> topicNames) {
+      this.consumerId = consumerId;
+      this.consumerGroupId = consumerGroupId;
+      this.topicNames = topicNames;
+    }
+  }
 
-  @Test
-  public void test3C1CGSubscribeOneTopicHistoricalData() throws Exception {
+  @Before
+  public void setUp() {
+    super.setUp();
+
+    // Setup connector attributes
+    ASYNC_CONNECTOR_ATTRIBUTES = new HashMap<>();
+    ASYNC_CONNECTOR_ATTRIBUTES.put("connector", 
"iotdb-thrift-async-connector");
+    ASYNC_CONNECTOR_ATTRIBUTES.put("connector.ip", receiverEnv.getIP());
+    ASYNC_CONNECTOR_ATTRIBUTES.put("connector.port", receiverEnv.getPort());
+
+    SYNC_CONNECTOR_ATTRIBUTES = new HashMap<>();
+    SYNC_CONNECTOR_ATTRIBUTES.put("connector", "iotdb-thrift-sync-connector");
+    SYNC_CONNECTOR_ATTRIBUTES.put("connector.ip", receiverEnv.getIP());
+    SYNC_CONNECTOR_ATTRIBUTES.put("connector.port", receiverEnv.getPort());
+
+    LEGACY_CONNECTOR_ATTRIBUTES = new HashMap<>();
+    LEGACY_CONNECTOR_ATTRIBUTES.put("connector", 
"iotdb-legacy-pipe-connector");
+    LEGACY_CONNECTOR_ATTRIBUTES.put("connector.ip", receiverEnv.getIP());
+    LEGACY_CONNECTOR_ATTRIBUTES.put("connector.port", receiverEnv.getPort());
+
+    final StringBuilder nodeUrlsBuilder = new StringBuilder();
+    for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) 
{
+      // Use default port for convenience
+      nodeUrlsBuilder
+          .append(wrapper.getIp())
+          .append(":")
+          .append(wrapper.getPipeAirGapReceiverPort())
+          .append(",");
+    }
+    AIR_GAP_CONNECTOR_ATTRIBUTES = new HashMap<>();
+    AIR_GAP_CONNECTOR_ATTRIBUTES.put("connector", "iotdb-air-gap-connector");
+    AIR_GAP_CONNECTOR_ATTRIBUTES.put("connector.node-urls", 
nodeUrlsBuilder.toString());
+
+    // Setup subscription info list with expected results
+    {
+      final List<SubscriptionInfo> subscriptionInfoList = new ArrayList<>();
+      subscriptionInfoList.add(new SubscriptionInfo("c1", "cg1", 
Collections.singleton("topic1")));
+      subscriptionInfoList.add(new SubscriptionInfo("c2", "cg1", 
Collections.singleton("topic1")));
+      subscriptionInfoList.add(new SubscriptionInfo("c3", "cg1", 
Collections.singleton("topic1")));
+
+      final Map<String, String> expectedHeaderWithResult = new HashMap<>();
+      expectedHeaderWithResult.put("count(root.cg1.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.topic2.s)", "100");
+
+      __3C_1CG_SUBSCRIBE_ONE_TOPIC = new Pair<>(subscriptionInfoList, 
expectedHeaderWithResult);
+    }
+    {
+      final List<SubscriptionInfo> subscriptionInfoList = new ArrayList<>();
+      subscriptionInfoList.add(new SubscriptionInfo("c1", "cg1", 
Collections.singleton("topic1")));
+      subscriptionInfoList.add(new SubscriptionInfo("c2", "cg2", 
Collections.singleton("topic1")));
+      subscriptionInfoList.add(new SubscriptionInfo("c3", "cg3", 
Collections.singleton("topic1")));
+
+      final Map<String, String> expectedHeaderWithResult = new HashMap<>();
+      expectedHeaderWithResult.put("count(root.cg1.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.cg2.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.cg3.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.topic2.s)", "100");
+
+      __3C_3CG_SUBSCRIBE_ONE_TOPIC = new Pair<>(subscriptionInfoList, 
expectedHeaderWithResult);
+    }
+    {
+      final List<SubscriptionInfo> subscriptionInfoList = new ArrayList<>();
+      subscriptionInfoList.add(new SubscriptionInfo("c1", "cg1", 
Collections.singleton("topic1")));
+      subscriptionInfoList.add(
+          new SubscriptionInfo("c2", "cg1", new 
HashSet<>(Arrays.asList("topic1", "topic2"))));
+      subscriptionInfoList.add(new SubscriptionInfo("c3", "cg1", 
Collections.singleton("topic2")));
+
+      final Map<String, String> expectedHeaderWithResult = new HashMap<>();
+      expectedHeaderWithResult.put("count(root.cg1.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.cg1.topic2.s)", "100");
+      expectedHeaderWithResult.put("count(root.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.topic2.s)", "100");
+
+      __3C_1CG_SUBSCRIBE_TWO_TOPIC = new Pair<>(subscriptionInfoList, 
expectedHeaderWithResult);
+    }
+    {
+      final List<SubscriptionInfo> subscriptionInfoList = new ArrayList<>();
+      subscriptionInfoList.add(new SubscriptionInfo("c1", "cg1", 
Collections.singleton("topic1")));
+      subscriptionInfoList.add(
+          new SubscriptionInfo("c2", "cg2", new 
HashSet<>(Arrays.asList("topic1", "topic2"))));
+      subscriptionInfoList.add(new SubscriptionInfo("c3", "cg3", 
Collections.singleton("topic2")));
+
+      final Map<String, String> expectedHeaderWithResult = new HashMap<>();
+      expectedHeaderWithResult.put("count(root.cg1.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.cg2.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.cg2.topic2.s)", "100");
+      expectedHeaderWithResult.put("count(root.cg3.topic2.s)", "100");
+      expectedHeaderWithResult.put("count(root.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.topic2.s)", "100");
+
+      __3C_3CG_SUBSCRIBE_TWO_TOPIC = new Pair<>(subscriptionInfoList, 
expectedHeaderWithResult);
+    }
+    {
+      final List<SubscriptionInfo> subscriptionInfoList = new ArrayList<>();
+      subscriptionInfoList.add(new SubscriptionInfo("c1", "cg1", 
Collections.singleton("topic1")));
+      subscriptionInfoList.add(
+          new SubscriptionInfo("c2", "cg2", new 
HashSet<>(Arrays.asList("topic1", "topic2"))));
+      subscriptionInfoList.add(new SubscriptionInfo("c3", "cg1", 
Collections.singleton("topic1")));
+      subscriptionInfoList.add(new SubscriptionInfo("c4", "cg2", 
Collections.singleton("topic2")));
+
+      final Map<String, String> expectedHeaderWithResult = new HashMap<>();
+      expectedHeaderWithResult.put("count(root.cg1.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.cg2.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.cg2.topic2.s)", "100");
+      expectedHeaderWithResult.put("count(root.topic1.s)", "100");
+      expectedHeaderWithResult.put("count(root.topic2.s)", "100");
+
+      __4C_2CG_SUBSCRIBE_TWO_TOPIC = new Pair<>(subscriptionInfoList, 
expectedHeaderWithResult);
+    }
+  }
+
+  private void testSubscriptionHistoricalDataTemplate(
+      final Map<String, String> connectorAttributes,
+      final List<SubscriptionInfo> subscriptionInfoList,
+      final Map<String, String> expectedHeaderWithResult)
+      throws Exception {
     final long currentTime = System.currentTimeMillis();
 
-    // Historical data
+    // Insert some historical data
     insertData(currentTime);
 
+    // Create topic 'topic1' and 'topic2'
     createTopics(currentTime);
-    createPipes(currentTime);
-    final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
-    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
 
+    // Create pipe 'sync_topic1' and 'sync_topic2' with given connector 
attributes
+    createPipes(currentTime, connectorAttributes);
+
+    // Create subscription and check result
     pollMessagesAndCheck(
-        consumers,
-        new HashMap<String, String>() {
-          {
-            put("count(root.cg1.topic1.s)", "100");
-            put("count(root.topic1.s)", "100");
-            put("count(root.topic2.s)", "100");
-          }
-        });
+        subscriptionInfoList.stream()
+            .map(
+                (info) -> {
+                  try {
+                    return createConsumerAndSubscribeTopics(info);
+                  } catch (final Exception e) {
+                    e.printStackTrace();
+                    fail(e.getMessage());
+                    return null;
+                  }
+                })
+            .collect(Collectors.toList()),
+        expectedHeaderWithResult);
   }
 
-  @Test
-  public void test3C3CGSubscribeOneTopicHistoricalData() throws Exception {
+  private void testSubscriptionRealtimeDataTemplate(
+      final Map<String, String> connectorAttributes,
+      final List<SubscriptionInfo> subscriptionInfoList,
+      final Map<String, String> expectedHeaderWithResult)
+      throws Exception {
     final long currentTime = System.currentTimeMillis();
 
-    // Historical data
-    insertData(currentTime);
-
+    // Create topic 'topic1' and 'topic2'
     createTopics(currentTime);
-    createPipes(currentTime);
-    final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
-    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic1"));
 
+    // Create pipe 'sync_topic1' and 'sync_topic2' with given connector 
attributes
+    createPipes(currentTime, connectorAttributes);
+
+    // Insert some realtime data
+    insertData(currentTime);
+
+    // Create subscription and check result
     pollMessagesAndCheck(
-        consumers,
-        new HashMap<String, String>() {
-          {
-            put("count(root.cg1.topic1.s)", "100");
-            put("count(root.cg2.topic1.s)", "100");
-            put("count(root.cg3.topic1.s)", "100");
-            put("count(root.topic1.s)", "100");
-            put("count(root.topic2.s)", "100");
-          }
-        });
+        subscriptionInfoList.stream()
+            .map(
+                (info) -> {
+                  try {
+                    return createConsumerAndSubscribeTopics(info);
+                  } catch (final Exception e) {
+                    e.printStackTrace();
+                    fail(e.getMessage());
+                    return null;
+                  }
+                })
+            .collect(Collectors.toList()),
+        expectedHeaderWithResult);
   }
 
+  // -------------------------------------- //
+  // 3 consumers, 1 consumer group, 1 topic //
+  // -------------------------------------- //
+
   @Test
-  public void test3C1CGSubscribeTwoTopicHistoricalData() throws Exception {
-    final long currentTime = System.currentTimeMillis();
+  public void test3C1CGSubscribeOneTopicHistoricalDataWithAsyncConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        ASYNC_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    // Historical data
-    insertData(currentTime);
+  @Test
+  public void test3C1CGSubscribeOneTopicHistoricalDataWithSyncConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        SYNC_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    createTopics(currentTime);
-    createPipes(currentTime);
-    final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
-    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1", 
"topic2"));
-    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic2"));
+  @Test
+  public void test3C1CGSubscribeOneTopicHistoricalDataWithLegacyConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        LEGACY_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    pollMessagesAndCheck(
-        consumers,
-        new HashMap<String, String>() {
-          {
-            put("count(root.cg1.topic1.s)", "100");
-            put("count(root.cg1.topic2.s)", "100");
-            put("count(root.topic1.s)", "100");
-            put("count(root.topic2.s)", "100");
-          }
-        });
+  @Test
+  public void test3C1CGSubscribeOneTopicHistoricalDataWithAirGapConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        AIR_GAP_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
   }
 
   @Test
-  public void test3C3CGSubscribeTwoTopicHistoricalData() throws Exception {
-    final long currentTime = System.currentTimeMillis();
+  public void test3C1CGSubscribeOneTopicRealtimeDataWithAsyncConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        ASYNC_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    // Historical data
-    insertData(currentTime);
+  @Test
+  public void test3C1CGSubscribeOneTopicRealtimeDataWithSyncConnector() throws 
Exception {
+    testSubscriptionRealtimeDataTemplate(
+        SYNC_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    createTopics(currentTime);
-    createPipes(currentTime);
-    final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
-    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1", 
"topic2"));
-    consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic2"));
+  @Test
+  public void test3C1CGSubscribeOneTopicRealtimeDataWithLegacyConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        LEGACY_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    pollMessagesAndCheck(
-        consumers,
-        new HashMap<String, String>() {
-          {
-            put("count(root.cg1.topic1.s)", "100");
-            put("count(root.cg2.topic1.s)", "100");
-            put("count(root.cg2.topic2.s)", "100");
-            put("count(root.cg3.topic2.s)", "100");
-            put("count(root.topic1.s)", "100");
-            put("count(root.topic2.s)", "100");
-          }
-        });
+  @Test
+  public void test3C1CGSubscribeOneTopicRealtimeDataWithAirGapConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        AIR_GAP_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_ONE_TOPIC.right);
   }
 
+  // --------------------------------------- //
+  // 3 consumers, 3 consumer groups, 1 topic //
+  // --------------------------------------- //
+
   @Test
-  public void test4C2CGSubscribeTwoTopicHistoricalData() throws Exception {
-    final long currentTime = System.currentTimeMillis();
+  public void test3C3CGSubscribeOneTopicHistoricalDataWithAsyncConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        ASYNC_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    // Historical data
-    insertData(currentTime);
+  @Test
+  public void test3C3CGSubscribeOneTopicHistoricalDataWithSyncConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        SYNC_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    createTopics(currentTime);
-    createPipes(currentTime);
-    final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
-    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1", 
"topic2"));
-    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c4", "cg2", "topic2"));
+  @Test
+  public void test3C3CGSubscribeOneTopicHistoricalDataWithLegacyConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        LEGACY_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    pollMessagesAndCheck(
-        consumers,
-        new HashMap<String, String>() {
-          {
-            put("count(root.cg1.topic1.s)", "100");
-            put("count(root.cg2.topic1.s)", "100");
-            put("count(root.cg2.topic2.s)", "100");
-            put("count(root.topic1.s)", "100");
-            put("count(root.topic2.s)", "100");
-          }
-        });
+  @Test
+  public void test3C3CGSubscribeOneTopicHistoricalDataWithAirGapConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        AIR_GAP_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
   }
 
-  // --------------------------- //
-  // Scenario 2: Realtime Data //
-  // --------------------------- //
+  @Test
+  public void test3C3CGSubscribeOneTopicRealtimeDataWithAsyncConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        ASYNC_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
   @Test
-  public void test3C1CGSubscribeOneTopicRealtimeData() throws Exception {
-    final long currentTime = System.currentTimeMillis();
+  public void test3C3CGSubscribeOneTopicRealtimeDataWithSyncConnector() throws 
Exception {
+    testSubscriptionRealtimeDataTemplate(
+        SYNC_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    createTopics(currentTime);
-    createPipes(currentTime);
-    final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
-    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
+  @Test
+  public void test3C3CGSubscribeOneTopicRealtimeDataWithLegacyConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        LEGACY_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    // Realtime data
-    insertData(currentTime);
+  @Test
+  public void test3C3CGSubscribeOneTopicRealtimeDataWithAirGapConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        AIR_GAP_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_ONE_TOPIC.right);
+  }
 
-    pollMessagesAndCheck(
-        consumers,
-        new HashMap<String, String>() {
-          {
-            put("count(root.cg1.topic1.s)", "100");
-            put("count(root.topic1.s)", "100");
-            put("count(root.topic2.s)", "100");
-          }
-        });
+  // --------------------------------------- //
+  // 3 consumers, 1 consumer group, 2 topics //
+  // --------------------------------------- //
+
+  @Test
+  public void test3C1CGSubscribeTwoTopicHistoricalDataWithAsyncConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        ASYNC_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
   }
 
   @Test
-  public void test3C3CGSubscribeOneTopicRealtimeData() throws Exception {
-    final long currentTime = System.currentTimeMillis();
+  public void test3C1CGSubscribeTwoTopicHistoricalDataWithSyncConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        SYNC_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
 
-    createTopics(currentTime);
-    createPipes(currentTime);
-    final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
-    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic1"));
+  @Test
+  public void test3C1CGSubscribeTwoTopicHistoricalDataWithLegacyConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        LEGACY_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
 
-    // Realtime data
-    insertData(currentTime);
+  @Test
+  public void test3C1CGSubscribeTwoTopicHistoricalDataWithAirGapConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        AIR_GAP_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
 
-    pollMessagesAndCheck(
-        consumers,
-        new HashMap<String, String>() {
-          {
-            put("count(root.cg1.topic1.s)", "100");
-            put("count(root.cg2.topic1.s)", "100");
-            put("count(root.cg3.topic1.s)", "100");
-            put("count(root.topic1.s)", "100");
-            put("count(root.topic2.s)", "100");
-          }
-        });
+  @Test
+  public void test3C1CGSubscribeTwoTopicRealtimeDataWithAsyncConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        ASYNC_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
   }
 
   @Test
-  public void test3C1CGSubscribeTwoTopicRealtimeData() throws Exception {
-    final long currentTime = System.currentTimeMillis();
+  public void test3C1CGSubscribeTwoTopicRealtimeDataWithSyncConnector() throws 
Exception {
+    testSubscriptionRealtimeDataTemplate(
+        SYNC_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
 
-    createTopics(currentTime);
-    createPipes(currentTime);
-    final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
-    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1", 
"topic2"));
-    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic2"));
+  @Test
+  public void test3C1CGSubscribeTwoTopicRealtimeDataWithLegacyConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        LEGACY_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
 
-    // Realtime data
-    insertData(currentTime);
+  @Test
+  public void test3C1CGSubscribeTwoTopicRealtimeDataWithAirGapConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        AIR_GAP_CONNECTOR_ATTRIBUTES,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_1CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
 
-    pollMessagesAndCheck(
-        consumers,
-        new HashMap<String, String>() {
-          {
-            put("count(root.cg1.topic1.s)", "100");
-            put("count(root.cg1.topic2.s)", "100");
-            put("count(root.topic1.s)", "100");
-            put("count(root.topic2.s)", "100");
-          }
-        });
+  // ---------------------------------------- //
+  // 3 consumers, 3 consumer groups, 2 topics //
+  // ---------------------------------------- //
+
+  @Test
+  public void test3C3CGSubscribeTwoTopicHistoricalDataWithAsyncConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        ASYNC_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
   }
 
   @Test
-  public void test3C3CGSubscribeTwoTopicRealtimeData() throws Exception {
-    final long currentTime = System.currentTimeMillis();
+  public void test3C3CGSubscribeTwoTopicHistoricalDataWithSyncConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        SYNC_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
 
-    createTopics(currentTime);
-    createPipes(currentTime);
-    final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
-    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1", 
"topic2"));
-    consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic2"));
+  @Test
+  public void test3C3CGSubscribeTwoTopicHistoricalDataWithLegacyConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        LEGACY_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
 
-    // Realtime data
-    insertData(currentTime);
+  @Test
+  public void test3C3CGSubscribeTwoTopicHistoricalDataWithAirGapConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        AIR_GAP_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
 
-    pollMessagesAndCheck(
-        consumers,
-        new HashMap<String, String>() {
-          {
-            put("count(root.cg1.topic1.s)", "100");
-            put("count(root.cg2.topic1.s)", "100");
-            put("count(root.cg2.topic2.s)", "100");
-            put("count(root.cg3.topic2.s)", "100");
-            put("count(root.topic1.s)", "100");
-            put("count(root.topic2.s)", "100");
-          }
-        });
+  @Test
+  public void test3C3CGSubscribeTwoTopicRealtimeDataWithAsyncConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        ASYNC_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
   }
 
   @Test
-  public void test4C2CGSubscribeTwoTopicRealtimeData() throws Exception {
-    final long currentTime = System.currentTimeMillis();
+  public void test3C3CGSubscribeTwoTopicRealtimeDataWithSyncConnector() throws 
Exception {
+    testSubscriptionRealtimeDataTemplate(
+        SYNC_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
 
-    createTopics(currentTime);
-    createPipes(currentTime);
-    final List<SubscriptionPullConsumer> consumers = new ArrayList<>();
-    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1", 
"topic2"));
-    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
-    consumers.add(createConsumerAndSubscribeTopics("c4", "cg2", "topic2"));
-
-    // Realtime data
-    insertData(currentTime);
+  @Test
+  public void test3C3CGSubscribeTwoTopicRealtimeDataWithLegacyConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        LEGACY_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
 
-    pollMessagesAndCheck(
-        consumers,
-        new HashMap<String, String>() {
-          {
-            put("count(root.cg1.topic1.s)", "100");
-            put("count(root.cg2.topic1.s)", "100");
-            put("count(root.cg2.topic2.s)", "100");
-            put("count(root.topic1.s)", "100");
-            put("count(root.topic2.s)", "100");
-          }
-        });
+  @Test
+  public void test3C3CGSubscribeTwoTopicRealtimeDataWithAirGapConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        AIR_GAP_CONNECTOR_ATTRIBUTES,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.left,
+        __3C_3CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
+
+  // ---------------------------------------- //
+  // 4 consumers, 2 consumer groups, 2 topics //
+  // ---------------------------------------- //
+
+  @Test
+  public void test4C2CGSubscribeTwoTopicHistoricalDataWithAsyncConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        ASYNC_CONNECTOR_ATTRIBUTES,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
+
+  @Test
+  public void test4C2CGSubscribeTwoTopicHistoricalDataWithSyncConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        SYNC_CONNECTOR_ATTRIBUTES,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
+
+  @Test
+  public void test4C2CGSubscribeTwoTopicHistoricalDataWithLegacyConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        LEGACY_CONNECTOR_ATTRIBUTES,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
+
+  @Test
+  public void test4C2CGSubscribeTwoTopicHistoricalDataWithAirGapConnector() 
throws Exception {
+    testSubscriptionHistoricalDataTemplate(
+        AIR_GAP_CONNECTOR_ATTRIBUTES,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
+
+  @Test
+  public void test4C2CGSubscribeTwoTopicRealtimeDataWithAsyncConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        ASYNC_CONNECTOR_ATTRIBUTES,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
+
+  @Test
+  public void test4C2CGSubscribeTwoTopicRealtimeDataWithSyncConnector() throws 
Exception {
+    testSubscriptionRealtimeDataTemplate(
+        SYNC_CONNECTOR_ATTRIBUTES,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
+
+  @Test
+  public void test4C2CGSubscribeTwoTopicRealtimeDataWithLegacyConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        LEGACY_CONNECTOR_ATTRIBUTES,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
+  }
+
+  @Test
+  public void test4C2CGSubscribeTwoTopicRealtimeDataWithAirGapConnector() 
throws Exception {
+    testSubscriptionRealtimeDataTemplate(
+        AIR_GAP_CONNECTOR_ATTRIBUTES,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.left,
+        __4C_2CG_SUBSCRIBE_TWO_TOPIC.right);
   }
 
   /////////////////////////////// utility ///////////////////////////////
@@ -368,17 +648,12 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
     }
   }
 
-  private void createPipes(final long currentTime) {
+  private void createPipes(final long currentTime, final Map<String, String> 
connectorAttributes) {
     // For sync reference
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
       final Map<String, String> extractorAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
-
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.ip", receiverEnv.getIP());
-      connectorAttributes.put("connector.port", receiverEnv.getPort());
 
       extractorAttributes.put("inclusion", "data.insert");
       extractorAttributes.put("inclusion.exclusion", "data.delete");
@@ -399,11 +674,6 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
       final Map<String, String> extractorAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
-
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.ip", receiverEnv.getIP());
-      connectorAttributes.put("connector.port", receiverEnv.getPort());
 
       extractorAttributes.put("inclusion", "data.insert");
       extractorAttributes.put("inclusion.exclusion", "data.delete");
@@ -422,18 +692,17 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
   }
 
   private SubscriptionPullConsumer createConsumerAndSubscribeTopics(
-      final String consumerId, final String consumerGroupId, final String... 
topicNames)
-      throws Exception {
+      final SubscriptionInfo subscriptionInfo) throws Exception {
     final SubscriptionPullConsumer consumer =
         new SubscriptionPullConsumer.Builder()
             .host(senderEnv.getIP())
             .port(Integer.parseInt(senderEnv.getPort()))
-            .consumerId(consumerId)
-            .consumerGroupId(consumerGroupId)
+            .consumerId(subscriptionInfo.consumerId)
+            .consumerGroupId(subscriptionInfo.consumerGroupId)
             .autoCommit(false)
             .buildPullConsumer();
     consumer.open();
-    consumer.subscribe(topicNames);
+    consumer.subscribe(subscriptionInfo.topicNames);
     return consumer;
   }
 
@@ -540,12 +809,14 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
           String.format(
               "insert into root.%s.topic1(time, s) values (%s, 1)",
               consumerGroupId, record.getTimestamp());
+      LOGGER.info(sql);
       return TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, sql);
     } else if ("root.topic2.s".equals(columnName)) {
       final String sql =
           String.format(
               "insert into root.%s.topic2(time, s) values (%s, 1)",
               consumerGroupId, record.getTimestamp());
+      LOGGER.info(sql);
       return TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, sql);
     } else {
       LOGGER.warn("unexpected column name: {}", columnName);


Reply via email to