This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.4
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8d5e92dbd678ac996fd906033cc680ef2929f45c
Author: VGalaxies <[email protected]>
AuthorDate: Mon May 19 14:24:15 2025 +0800

    Subscription IT: add IoTDBSubscriptionTopicIT for table model (#15525)
    
    (cherry picked from commit cb19f370d4d15f09afd43314b4d7157b6707b2da)
---
 .github/workflows/pipe-it.yml                      |  90 ++++-
 .../iotdb/subscription/it/dual/tablemodel/.gitkeep |  18 -
 .../dual/tablemodel/IoTDBSubscriptionTopicIT.java  | 407 +++++++++++++++++++++
 3 files changed, 494 insertions(+), 21 deletions(-)

diff --git a/.github/workflows/pipe-it.yml b/.github/workflows/pipe-it.yml
index 2a41cdfdf5b..ba09fb4e27e 100644
--- a/.github/workflows/pipe-it.yml
+++ b/.github/workflows/pipe-it.yml
@@ -512,7 +512,91 @@ jobs:
         if: failure()
         uses: actions/upload-artifact@v4
         with:
-          name: cluster-log-subscription-java${{ matrix.java }}-${{ runner.os 
}}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
+          name: cluster-log-subscription-tree-arch-verification-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
+          path: integration-test/target/cluster-logs
+          retention-days: 30
+  subscription-table-arch-verification:
+    strategy:
+      fail-fast: false
+      max-parallel: 15
+      matrix:
+        java: [ 17 ]
+        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
+        cluster1: [ ScalableSingleNodeMode ]
+        cluster2: [ ScalableSingleNodeMode ]
+        os: [ ubuntu-latest ]
+    runs-on: ${{ matrix.os }}
+    steps:
+      - uses: actions/checkout@v4
+      - name: Set up JDK ${{ matrix.java }}
+        uses: actions/setup-java@v4
+        with:
+          distribution: liberica
+          java-version: ${{ matrix.java }}
+      - name: Cache Maven packages
+        uses: actions/cache@v4
+        with:
+          path: ~/.m2
+          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+          restore-keys: ${{ runner.os }}-m2-
+      - name: Sleep for a random duration between 0 and 10000 milliseconds
+        run: |
+          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
+      - name: IT Test
+        shell: bash
+        # we do not compile client-cpp for saving time, it is tested in 
client.yml
+        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
+        run: |
+          retry() {
+            local -i max_attempts=3
+            local -i attempt=1
+            local -i retry_sleep=5
+            local test_output
+
+            while [ $attempt -le $max_attempts ]; do
+              mvn clean verify \
+              -P with-integration-tests \
+              -DskipUTs \
+              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
+              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
+              -pl integration-test \
+              -am -PMultiClusterIT2SubscriptionTableArchVerification \
+              -ntp >> ~/run-tests-$attempt.log && return 0
+              test_output=$(cat ~/run-tests-$attempt.log) 
+
+              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
+              echo "$test_output"
+              echo "==================== END: ~/run-tests-$attempt.log 
======================"
+
+              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
+                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
+              fi
+
+              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
+                if [ $attempt -lt $max_attempts ]; then
+                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
+                  sleep $retry_sleep
+                  attempt=$((attempt + 1))
+                else
+                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
+                  echo "Treating this as a success because the issue is likely 
transient."
+                  return 0
+                fi
+              elif [ $? -ne 0 ]; then
+                echo "Test failed with a different error."
+                return 1
+              else
+                echo "Tests passed"
+                return 0
+              fi
+            done
+          }
+          retry
+      - name: Upload Artifact
+        if: failure()
+        uses: actions/upload-artifact@v4
+        with:
+          name: cluster-log-subscription-table-arch-verification-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
           path: integration-test/target/cluster-logs
           retention-days: 30
   subscription-tree-regression-consumer:
@@ -596,7 +680,7 @@ jobs:
         if: failure()
         uses: actions/upload-artifact@v4
         with:
-          name: cluster-log-subscription-regression-consumer-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
+          name: cluster-log-subscription-tree-regression-consumer-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
           path: integration-test/target/cluster-logs
           retention-days: 30
   subscription-tree-regression-misc:
@@ -680,7 +764,7 @@ jobs:
         if: failure()
         uses: actions/upload-artifact@v4
         with:
-          name: cluster-log-subscription-regression-misc-java${{ matrix.java 
}}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
+          name: cluster-log-subscription-tree-regression-misc-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
           path: integration-test/target/cluster-logs
           retention-days: 30
   dual-table-manual-basic:
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/.gitkeep
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/.gitkeep
deleted file mode 100644
index 585be9602fc..00000000000
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/.gitkeep
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java
new file mode 100644
index 00000000000..d618bb95138
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.dual.tablemodel;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import 
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTableArchVerification;
+import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.session.subscription.ISubscriptionTableSession;
+import org.apache.iotdb.session.subscription.SubscriptionTableSessionBuilder;
+import 
org.apache.iotdb.session.subscription.consumer.ISubscriptionTablePullConsumer;
+import 
org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumerBuilder;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+import org.apache.iotdb.subscription.it.dual.AbstractSubscriptionDualIT;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Consumer;
+
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2SubscriptionTableArchVerification.class})
+public class IoTDBSubscriptionTopicIT extends AbstractSubscriptionDualIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionTopicIT.class);
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    // Shorten heartbeat and sync interval to avoid timeout of snapshot mode 
test
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(30);
+    
senderEnv.getConfig().getCommonConfig().setPipeMetaSyncerInitialSyncDelayMinutes(1);
+    
senderEnv.getConfig().getCommonConfig().setPipeMetaSyncerSyncIntervalMinutes(1);
+  }
+
+  @Test
+  public void testTabletTopicWithPath() throws Exception {
+    
testTopicWithPathTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+  }
+
+  @Test
+  public void testTsFileTopicWithPath() throws Exception {
+    testTopicWithPathTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+  }
+
+  private void testTopicWithPathTemplate(final String topicFormat) throws 
Exception {
+    TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
+    TableModelUtils.createDataBaseAndTable(senderEnv, "test2", "test2");
+    TableModelUtils.createDataBaseAndTable(senderEnv, "foo", "foo");
+
+    TableModelUtils.createDataBaseAndTable(receiverEnv, "test1", "test1");
+    TableModelUtils.createDataBaseAndTable(receiverEnv, "test2", "test2");
+    TableModelUtils.createDataBaseAndTable(receiverEnv, "foo", "foo");
+
+    // Insert some historical data on sender
+    TableModelUtils.insertData("test1", "test1", 0, 10, senderEnv);
+    TableModelUtils.insertData("test2", "test2", 0, 10, senderEnv);
+    TableModelUtils.insertData("foo", "foo", 0, 10, senderEnv);
+
+    // Create topic on sender
+    final String topicName = "topic1";
+    final String host = senderEnv.getIP();
+    final int port = Integer.parseInt(senderEnv.getPort());
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, topicFormat);
+      config.put(TopicConstant.DATABASE_KEY, "test.*");
+      config.put(TopicConstant.TABLE_KEY, "test.*");
+      session.createTopic(topicName, config);
+    }
+    assertTopicCount(1);
+
+    // Subscribe on sender and insert on receiver
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final ISubscriptionTablePullConsumer consumer =
+                      new SubscriptionTablePullConsumerBuilder()
+                          .host(host)
+                          .port(port)
+                          .consumerId("c1")
+                          .consumerGroupId("cg1")
+                          .autoCommit(false)
+                          .build();
+                  final ITableSession session = 
receiverEnv.getTableSessionConnection()) {
+                consumer.open();
+                consumer.subscribe(topicName);
+                while (!isClosed.get()) {
+                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+                  final List<SubscriptionMessage> messages =
+                      
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+                  insertData(messages, session);
+                  consumer.commitSync(messages);
+                }
+                consumer.unsubscribe(topicName);
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // Avoid fail
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            },
+            String.format("%s - consumer", testName.getDisplayName()));
+    thread.start();
+
+    // Check data on receiver
+    try {
+      final Consumer<String> handleFailure =
+          o -> {
+            TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+            TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+          };
+      // Keep retrying if there are execution failures
+      AWAIT.untilAsserted(
+          () -> {
+            TableModelUtils.assertData("test1", "test1", 0, 10, receiverEnv, 
handleFailure);
+            TableModelUtils.assertData("test2", "test2", 0, 10, receiverEnv, 
handleFailure);
+          });
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      isClosed.set(true);
+      thread.join();
+    }
+  }
+
+  @Test
+  public void testTabletTopicWithTime() throws Exception {
+    
testTopicWithTimeTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+  }
+
+  @Test
+  public void testTsFileTopicWithTime() throws Exception {
+    testTopicWithTimeTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+  }
+
+  private void testTopicWithTimeTemplate(final String topicFormat) throws 
Exception {
+    TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
+    TableModelUtils.createDataBaseAndTable(receiverEnv, "test1", "test1");
+
+    // Insert some historical data on sender
+    TableModelUtils.insertData("test1", "test1", 0, 100, senderEnv);
+
+    // Create topic on sender
+    final String topicName = "topic2";
+    final String host = senderEnv.getIP();
+    final int port = Integer.parseInt(senderEnv.getPort());
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, topicFormat);
+      config.put(TopicConstant.START_TIME_KEY, 25);
+      config.put(TopicConstant.END_TIME_KEY, 75);
+      session.createTopic(topicName, config);
+    }
+    assertTopicCount(1);
+
+    // Subscribe on sender and insert on receiver
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final ISubscriptionTablePullConsumer consumer =
+                      new SubscriptionTablePullConsumerBuilder()
+                          .host(host)
+                          .port(port)
+                          .consumerId("c1")
+                          .consumerGroupId("cg1")
+                          .autoCommit(false)
+                          .build();
+                  final ITableSession session = 
receiverEnv.getTableSessionConnection()) {
+                consumer.open();
+                consumer.subscribe(topicName);
+                while (!isClosed.get()) {
+                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+                  final List<SubscriptionMessage> messages =
+                      
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+                  insertData(messages, session);
+                  consumer.commitSync(messages);
+                }
+                consumer.unsubscribe(topicName);
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // Avoid fail
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            },
+            String.format("%s - consumer", testName.getDisplayName()));
+    thread.start();
+
+    // Check data on receiver
+    try {
+      final Consumer<String> handleFailure =
+          o -> {
+            TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+            TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+          };
+      // Keep retrying if there are execution failures
+      AWAIT.untilAsserted(
+          () -> TableModelUtils.assertData("test1", "test1", 25, 76, 
receiverEnv, handleFailure));
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      isClosed.set(true);
+      thread.join();
+    }
+  }
+
+  @Test
+  public void testTabletTopicWithSnapshotMode() throws Exception {
+    
testTopicWithSnapshotModeTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+  }
+
+  @Test
+  public void testTsFileTopicWithSnapshotMode() throws Exception {
+    
testTopicWithSnapshotModeTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+  }
+
+  private void testTopicWithSnapshotModeTemplate(final String topicFormat) 
throws Exception {
+    TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
+    TableModelUtils.createDataBaseAndTable(receiverEnv, "test1", "test1");
+
+    // Insert some historical data on sender
+    TableModelUtils.insertData("test1", "test1", 0, 100, senderEnv);
+
+    // Create topic
+    final String topicName = "topic3";
+    final String host = senderEnv.getIP();
+    final int port = Integer.parseInt(senderEnv.getPort());
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, topicFormat);
+      config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE);
+      session.createTopic(topicName, config);
+    }
+    assertTopicCount(1);
+
+    // Subscription
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final ISubscriptionTablePullConsumer consumer =
+                      new SubscriptionTablePullConsumerBuilder()
+                          .host(host)
+                          .port(port)
+                          .consumerId("c1")
+                          .consumerGroupId("cg1")
+                          .autoCommit(false)
+                          .build();
+                  final ITableSession session = 
receiverEnv.getTableSessionConnection()) {
+                consumer.open();
+                consumer.subscribe(topicName);
+
+                // Insert some realtime data on sender
+                TableModelUtils.insertData("test1", "test1", 100, 200, 
senderEnv);
+
+                while (!isClosed.get()) {
+                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+                  final List<SubscriptionMessage> messages =
+                      
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+                  insertData(messages, session);
+                  consumer.commitSync(messages);
+                }
+
+                // Exiting the loop represents passing the awaitility test, at 
this point the result
+                // of 'show subscription' is empty, so there is no need to 
explicitly unsubscribe.
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // Avoid failure
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            },
+            String.format("%s - consumer", testName.getDisplayName()));
+    thread.start();
+
+    try {
+      final Consumer<String> handleFailure =
+          o -> {
+            TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+            TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+          };
+      // Keep retrying if there are execution failures
+      AWAIT.untilAsserted(
+          () -> {
+            // Check empty subscription
+            try (final SyncConfigNodeIServiceClient client =
+                (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+              final TShowSubscriptionResp showSubscriptionResp =
+                  client.showSubscription(new TShowSubscriptionReq());
+              Assert.assertEquals(
+                  RpcUtils.SUCCESS_STATUS.getCode(), 
showSubscriptionResp.status.getCode());
+              Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
+              Assert.assertEquals(0, 
showSubscriptionResp.subscriptionInfoList.size());
+            }
+            // Check data
+            TableModelUtils.assertData("test1", "test1", 0, 100, receiverEnv, 
handleFailure);
+          });
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      isClosed.set(true);
+      thread.join();
+    }
+  }
+
+  /////////////////////////////// utility ///////////////////////////////
+
+  private void assertTopicCount(final int count) throws Exception {
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowTopicInfo> showTopicResult =
+          client.showTopic(new 
TShowTopicReq().setIsTableModel(true)).topicInfoList;
+      Assert.assertEquals(count, showTopicResult.size());
+    }
+  }
+
+  private void insertData(final List<SubscriptionMessage> messages, final 
ITableSession session)
+      throws Exception {
+    for (final SubscriptionMessage message : messages) {
+      final short messageType = message.getMessageType();
+      if (!SubscriptionMessageType.isValidatedMessageType(messageType)) {
+        LOGGER.warn("unexpected message type: {}", messageType);
+        continue;
+      }
+      switch (SubscriptionMessageType.valueOf(messageType)) {
+        case SESSION_DATA_SETS_HANDLER:
+          for (final SubscriptionSessionDataSet dataSet : 
message.getSessionDataSetsHandler()) {
+            session.executeNonQueryStatement(
+                "use " + Objects.requireNonNull(dataSet.getDatabaseName()));
+            session.insert(dataSet.getTablet());
+          }
+          break;
+        case TS_FILE_HANDLER:
+          final SubscriptionTsFileHandler tsFileHandler = 
message.getTsFileHandler();
+          session.executeNonQueryStatement(
+              "use " + 
Objects.requireNonNull(tsFileHandler.getDatabaseName()));
+          session.executeNonQueryStatement(
+              String.format("load '%s'", 
tsFileHandler.getFile().getAbsolutePath()));
+          break;
+        default:
+          LOGGER.warn("unexpected message type: {}", messageType);
+          break;
+      }
+    }
+  }
+}

Reply via email to