This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.4 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2e115f6a83f379bd82e382ec31a7db60749c87a5 Author: VGalaxies <[email protected]> AuthorDate: Wed May 7 11:29:16 2025 +0800 Subscription: mark topic as completed when there is no corresponding prefetching queue (#15433) (cherry picked from commit 3a9855efa523ea9e8f0d327b339d9ee961b6d4e1) --- .../it/dual/IoTDBSubscriptionTopicIT.java | 72 ++++++++++++++++++++++ .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 27 ++++++++ .../agent/SubscriptionBrokerAgent.java | 10 +++ .../db/subscription/broker/SubscriptionBroker.java | 9 +++ .../pipe/agent/task/meta/PipeStaticMeta.java | 4 ++ 5 files changed, 122 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java index 4c3f43c5083..c3ef301fee2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java @@ -827,6 +827,78 @@ public class IoTDBSubscriptionTopicIT extends AbstractSubscriptionDualIT { } } + @Test + public void testSnapshotModeWithEmptyData() throws Exception { + // Create topic + final String topicName = "topic11"; + final String host = senderEnv.getIP(); + final int port = Integer.parseInt(senderEnv.getPort()); + try (final SubscriptionTreeSession session = new SubscriptionTreeSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE); + session.createTopic(topicName, config); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + assertTopicCount(1); + + // Subscription + final Thread thread = + new Thread( + () -> { + try (final SubscriptionTreePullConsumer consumer = + new SubscriptionTreePullConsumer.Builder() + .host(host) + .port(port) + .consumerId("c1") + .consumerGroupId("cg1") + .autoCommit(true) + .buildPullConsumer()) { + consumer.open(); + consumer.subscribe(topicName); + + while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) { + LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time + consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); // poll and ignore + } + + // Exiting the loop represents passing the awaitility test, at this point the result + // of 'show subscription' is empty, so there is no need to explicitly unsubscribe. + } catch (final Exception e) { + e.printStackTrace(); + // Avoid failure + } finally { + LOGGER.info("consumer exiting..."); + } + }, + String.format("%s - consumer", testName.getDisplayName())); + thread.start(); + + try { + // Keep retrying if there are execution failures + AWAIT.untilAsserted( + () -> { + // Check empty subscription + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final TShowSubscriptionResp showSubscriptionResp = + client.showSubscription(new TShowSubscriptionReq()); + Assert.assertEquals( + RpcUtils.SUCCESS_STATUS.getCode(), showSubscriptionResp.status.getCode()); + Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList); + Assert.assertEquals(0, showSubscriptionResp.subscriptionInfoList.size()); + } + }); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + thread.join(); + } + } + /////////////////////////////// utility /////////////////////////////// private void assertTopicCount(final int count) throws Exception { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index df196eabaf8..9f657446f74 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -32,6 +32,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Tag; @@ -56,6 +57,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSc import org.apache.iotdb.db.schemaengine.SchemaEngine; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; +import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.SystemMetric; import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; @@ -339,6 +341,16 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { // Get the pipe meta first because it is removed after super#dropPipe(pipeName) final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + // Record whether there are pipe tasks before dropping the pipe + final boolean hasPipeTasks; + if (Objects.nonNull(pipeMeta)) { + final Map<Integer, PipeTask> pipeTaskMap = + pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()); + hasPipeTasks = Objects.nonNull(pipeTaskMap) && !pipeTaskMap.isEmpty(); + } else { + hasPipeTasks = false; + } + if (!super.dropPipe(pipeName)) { return false; } @@ -348,6 +360,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { final String taskId = pipeName + "_" + creationTime; PipeTsFileToTabletsMetrics.getInstance().deregister(taskId); PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId); + // When the pipe contains no pipe tasks, there is no corresponding prefetching queue for the + // subscribed pipe, so the subscription needs to be manually marked as completed. + if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) { + final String topicName = + pipeMeta + .getStaticMeta() + .getConnectorParameters() + .getString(PipeConnectorConstant.SINK_TOPIC_KEY); + final String consumerGroupId = + pipeMeta + .getStaticMeta() + .getConnectorParameters() + .getString(PipeConnectorConstant.SINK_CONSUMER_GROUP_KEY); + SubscriptionAgent.broker().updateCompletedTopicNames(consumerGroupId, topicName); + } } return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java index 057f31aa66b..b750834f4ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java @@ -184,6 +184,16 @@ public class SubscriptionBrokerAgent { .bindPrefetchingQueue(subtask.getTopicName(), subtask.getInputPendingQueue()); } + public void updateCompletedTopicNames(final String consumerGroupId, final String topicName) { + final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); + if (Objects.isNull(broker)) { + LOGGER.warn( + "Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId); + return; + } + broker.updateCompletedTopicNames(topicName); + } + public void unbindPrefetchingQueue(final String consumerGroupId, final String topicName) { final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); if (Objects.isNull(broker)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java index f2ed903f74f..6106b60cabf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java @@ -375,6 +375,15 @@ public class SubscriptionBroker { brokerId); } + public void updateCompletedTopicNames(final String topicName) { + // mark topic name completed only for topic of snapshot mode + if (SubscriptionAgent.topic() + .getTopicMode(topicName) + .equals(TopicConstant.MODE_SNAPSHOT_VALUE)) { + completedTopicNames.put(topicName, topicName); + } + } + public void unbindPrefetchingQueue(final String topicName) { final SubscriptionPrefetchingQueue prefetchingQueue = topicNameToPrefetchingQueue.get(topicName); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java index 5178aeb2643..92dd20c0778 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java @@ -230,6 +230,10 @@ public class PipeStaticMeta { return SUBSCRIPTION_PIPE_PREFIX + topicName + "_" + consumerGroupId; } + public static boolean isSubscriptionPipe(final String pipeName) { + return Objects.nonNull(pipeName) && pipeName.startsWith(SUBSCRIPTION_PIPE_PREFIX); + } + ///////////////////////////////// Tree & Table Isolation ///////////////////////////////// public boolean visibleUnder(final boolean isTableModel) {
