This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch sub-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/sub-fix by this push:
     new 210a483ae6a Subscription: Fixed multiple problems (#17418)
210a483ae6a is described below

commit 210a483ae6af0cd41d7317e15c96e881637f7cdf
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 2 09:41:04 2026 +0800

    Subscription: Fixed multiple problems (#17418)
---
 .../persistence/subscription/SubscriptionInfo.java | 51 +++++++++++-----------
 .../agent/SubscriptionBrokerAgent.java             |  2 +-
 .../agent/SubscriptionConsumerAgent.java           |  1 -
 .../subscription/agent/SubscriptionTopicAgent.java | 21 +++++----
 .../meta/consumer/ConsumerGroupMeta.java           |  9 ++--
 5 files changed, 45 insertions(+), 39 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
index 9a1c6acc72a..a2758906f5b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
@@ -220,26 +220,21 @@ public class SubscriptionInfo implements 
SnapshotProcessor {
     }
   }
 
-  public void validatePipePluginUsageByTopicInternal(String pipePluginName)
+  private void validatePipePluginUsageByTopicInternal(String pipePluginName)
       throws SubscriptionException {
-    acquireReadLock();
-    try {
-      topicMetaKeeper
-          .getAllTopicMeta()
-          .forEach(
-              meta -> {
-                if 
(pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) {
-                  final String exceptionMessage =
-                      String.format(
-                          "PipePlugin '%s' is already used by Topic '%s' as a 
processor.",
-                          pipePluginName, meta.getTopicName());
-                  LOGGER.warn(exceptionMessage);
-                  throw new SubscriptionException(exceptionMessage);
-                }
-              });
-    } finally {
-      releaseReadLock();
-    }
+    topicMetaKeeper
+        .getAllTopicMeta()
+        .forEach(
+            meta -> {
+              if 
(pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) {
+                final String exceptionMessage =
+                    String.format(
+                        "PipePlugin '%s' is already used by Topic '%s' as a 
processor.",
+                        pipePluginName, meta.getTopicName());
+                LOGGER.warn(exceptionMessage);
+                throw new SubscriptionException(exceptionMessage);
+              }
+            });
   }
 
   public void validateBeforeAlteringTopic(TopicMeta topicMeta) throws 
SubscriptionException {
@@ -326,21 +321,25 @@ public class SubscriptionInfo implements 
SnapshotProcessor {
   public TSStatus alterTopic(AlterTopicPlan plan) {
     acquireWriteLock();
     try {
-      topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName());
-      topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), 
plan.getTopicMeta());
-      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      return alterTopicInternal(plan);
     } finally {
       releaseWriteLock();
     }
   }
 
-  public TSStatus alterMultipleTopics(AlterMultipleTopicsPlan plan) {
+  private TSStatus alterTopicInternal(final AlterTopicPlan plan) {
+    topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName());
+    topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), 
plan.getTopicMeta());
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  public TSStatus alterMultipleTopics(final AlterMultipleTopicsPlan plan) {
     acquireWriteLock();
     try {
-      TSStatus status = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      final TSStatus status = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       status.setSubStatus(new ArrayList<>());
-      for (AlterTopicPlan subPlan : plan.getSubPlans()) {
-        TSStatus innerStatus = alterTopic(subPlan);
+      for (final AlterTopicPlan subPlan : plan.getSubPlans()) {
+        final TSStatus innerStatus = alterTopicInternal(subPlan);
         status.getSubStatus().add(innerStatus);
         if (innerStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           status.setCode(TSStatusCode.ALTER_TOPIC_ERROR.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 510f8559bc1..00007f921b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -279,7 +279,7 @@ public class SubscriptionBrokerAgent {
    */
   private static class Cache<T> {
 
-    private T value;
+    private volatile T value;
     private volatile boolean valid = false;
     private final Supplier<T> supplier;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
index fee23cf6af4..4ee6b191a24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
@@ -96,7 +96,6 @@ public class SubscriptionConsumerAgent {
 
     // if consumer group meta does not exist on local agent
     if (Objects.isNull(metaInAgent)) {
-      consumerGroupMetaKeeper.removeConsumerGroupMeta(consumerGroupId);
       consumerGroupMetaKeeper.addConsumerGroupMeta(consumerGroupId, 
metaFromCoordinator);
       SubscriptionAgent.broker().createBrokerIfNotExist(consumerGroupId);
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
index 4c2bf5d0217..37cdaa72690 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
@@ -149,10 +149,12 @@ public class SubscriptionTopicAgent {
   public String getTopicFormat(final String topicName) {
     acquireReadLock();
     try {
-      return topicMetaKeeper
-          .getTopicMeta(topicName)
-          .getConfig()
-          .getStringOrDefault(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_DEFAULT_VALUE);
+      return topicMetaKeeper.containsTopicMeta(topicName)
+          ? topicMetaKeeper
+              .getTopicMeta(topicName)
+              .getConfig()
+              .getStringOrDefault(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_DEFAULT_VALUE)
+          : null;
     } finally {
       releaseReadLock();
     }
@@ -161,10 +163,12 @@ public class SubscriptionTopicAgent {
   public String getTopicMode(final String topicName) {
     acquireReadLock();
     try {
-      return topicMetaKeeper
-          .getTopicMeta(topicName)
-          .getConfig()
-          .getStringOrDefault(TopicConstant.MODE_KEY, 
TopicConstant.MODE_DEFAULT_VALUE);
+      return topicMetaKeeper.containsTopicMeta(topicName)
+          ? topicMetaKeeper
+              .getTopicMeta(topicName)
+              .getConfig()
+              .getStringOrDefault(TopicConstant.MODE_KEY, 
TopicConstant.MODE_DEFAULT_VALUE)
+          : null;
     } finally {
       releaseReadLock();
     }
@@ -174,6 +178,7 @@ public class SubscriptionTopicAgent {
     acquireReadLock();
     try {
       return topicNames.stream()
+          .filter(topicMetaKeeper::containsTopicMeta)
           .collect(
               Collectors.toMap(
                   topicName -> topicName,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index f8e486537f6..b316c5f155d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -33,6 +33,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -120,11 +121,13 @@ public class ConsumerGroupMeta {
 
   public void removeConsumer(final String consumerId) {
     consumerIdToConsumerMeta.remove(consumerId);
-    for (final Map.Entry<String, Set<String>> entry :
-        topicNameToSubscribedConsumerIdSet.entrySet()) {
+    final Iterator<Map.Entry<String, Set<String>>> iterator =
+        topicNameToSubscribedConsumerIdSet.entrySet().iterator();
+    while (iterator.hasNext()) {
+      final Map.Entry<String, Set<String>> entry = iterator.next();
       entry.getValue().remove(consumerId);
       if (entry.getValue().isEmpty()) {
-        topicNameToSubscribedConsumerIdSet.remove(entry.getKey());
+        iterator.remove();
       }
     }
   }

Reply via email to