This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.4
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2e115f6a83f379bd82e382ec31a7db60749c87a5
Author: VGalaxies <[email protected]>
AuthorDate: Wed May 7 11:29:16 2025 +0800

    Subscription: mark topic as completed when there is no corresponding 
prefetching queue  (#15433)
    
    (cherry picked from commit 3a9855efa523ea9e8f0d327b339d9ee961b6d4e1)
---
 .../it/dual/IoTDBSubscriptionTopicIT.java          | 72 ++++++++++++++++++++++
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 27 ++++++++
 .../agent/SubscriptionBrokerAgent.java             | 10 +++
 .../db/subscription/broker/SubscriptionBroker.java |  9 +++
 .../pipe/agent/task/meta/PipeStaticMeta.java       |  4 ++
 5 files changed, 122 insertions(+)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 4c3f43c5083..c3ef301fee2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -827,6 +827,78 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     }
   }
 
+  @Test
+  public void testSnapshotModeWithEmptyData() throws Exception {
+    // Create topic
+    final String topicName = "topic11";
+    final String host = senderEnv.getIP();
+    final int port = Integer.parseInt(senderEnv.getPort());
+    try (final SubscriptionTreeSession session = new 
SubscriptionTreeSession(host, port)) {
+      session.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE);
+      session.createTopic(topicName, config);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+    assertTopicCount(1);
+
+    // Subscription
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final SubscriptionTreePullConsumer consumer =
+                  new SubscriptionTreePullConsumer.Builder()
+                      .host(host)
+                      .port(port)
+                      .consumerId("c1")
+                      .consumerGroupId("cg1")
+                      .autoCommit(true)
+                      .buildPullConsumer()) {
+                consumer.open();
+                consumer.subscribe(topicName);
+
+                while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
+                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+                  consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); 
// poll and ignore
+                }
+
+                // Exiting the loop represents passing the awaitility test, at 
this point the result
+                // of 'show subscription' is empty, so there is no need to 
explicitly unsubscribe.
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // Avoid failure
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            },
+            String.format("%s - consumer", testName.getDisplayName()));
+    thread.start();
+
+    try {
+      // Keep retrying if there are execution failures
+      AWAIT.untilAsserted(
+          () -> {
+            // Check empty subscription
+            try (final SyncConfigNodeIServiceClient client =
+                (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+              final TShowSubscriptionResp showSubscriptionResp =
+                  client.showSubscription(new TShowSubscriptionReq());
+              Assert.assertEquals(
+                  RpcUtils.SUCCESS_STATUS.getCode(), 
showSubscriptionResp.status.getCode());
+              Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
+              Assert.assertEquals(0, 
showSubscriptionResp.subscriptionInfoList.size());
+            }
+          });
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      thread.join();
+    }
+  }
+
   /////////////////////////////// utility ///////////////////////////////
 
   private void assertTopicCount(final int count) throws Exception {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index df196eabaf8..9f657446f74 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -56,6 +57,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSc
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
+import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.SystemMetric;
 import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
@@ -339,6 +341,16 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     // Get the pipe meta first because it is removed after 
super#dropPipe(pipeName)
     final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
+    // Record whether there are pipe tasks before dropping the pipe
+    final boolean hasPipeTasks;
+    if (Objects.nonNull(pipeMeta)) {
+      final Map<Integer, PipeTask> pipeTaskMap =
+          pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
+      hasPipeTasks = Objects.nonNull(pipeTaskMap) && !pipeTaskMap.isEmpty();
+    } else {
+      hasPipeTasks = false;
+    }
+
     if (!super.dropPipe(pipeName)) {
       return false;
     }
@@ -348,6 +360,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       final String taskId = pipeName + "_" + creationTime;
       PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
       
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
+      // When the pipe contains no pipe tasks, there is no corresponding 
prefetching queue for the
+      // subscribed pipe, so the subscription needs to be manually marked as 
completed.
+      if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) {
+        final String topicName =
+            pipeMeta
+                .getStaticMeta()
+                .getConnectorParameters()
+                .getString(PipeConnectorConstant.SINK_TOPIC_KEY);
+        final String consumerGroupId =
+            pipeMeta
+                .getStaticMeta()
+                .getConnectorParameters()
+                .getString(PipeConnectorConstant.SINK_CONSUMER_GROUP_KEY);
+        SubscriptionAgent.broker().updateCompletedTopicNames(consumerGroupId, 
topicName);
+      }
     }
 
     return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 057f31aa66b..b750834f4ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -184,6 +184,16 @@ public class SubscriptionBrokerAgent {
         .bindPrefetchingQueue(subtask.getTopicName(), 
subtask.getInputPendingQueue());
   }
 
+  public void updateCompletedTopicNames(final String consumerGroupId, final 
String topicName) {
+    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+    if (Objects.isNull(broker)) {
+      LOGGER.warn(
+          "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
+      return;
+    }
+    broker.updateCompletedTopicNames(topicName);
+  }
+
   public void unbindPrefetchingQueue(final String consumerGroupId, final 
String topicName) {
     final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
     if (Objects.isNull(broker)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index f2ed903f74f..6106b60cabf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -375,6 +375,15 @@ public class SubscriptionBroker {
         brokerId);
   }
 
+  public void updateCompletedTopicNames(final String topicName) {
+    // mark topic name completed only for topic of snapshot mode
+    if (SubscriptionAgent.topic()
+        .getTopicMode(topicName)
+        .equals(TopicConstant.MODE_SNAPSHOT_VALUE)) {
+      completedTopicNames.put(topicName, topicName);
+    }
+  }
+
   public void unbindPrefetchingQueue(final String topicName) {
     final SubscriptionPrefetchingQueue prefetchingQueue =
         topicNameToPrefetchingQueue.get(topicName);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
index 5178aeb2643..92dd20c0778 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
@@ -230,6 +230,10 @@ public class PipeStaticMeta {
     return SUBSCRIPTION_PIPE_PREFIX + topicName + "_" + consumerGroupId;
   }
 
+  public static boolean isSubscriptionPipe(final String pipeName) {
+    return Objects.nonNull(pipeName) && 
pipeName.startsWith(SUBSCRIPTION_PIPE_PREFIX);
+  }
+
   /////////////////////////////////  Tree & Table Isolation  
/////////////////////////////////
 
   public boolean visibleUnder(final boolean isTableModel) {

Reply via email to