This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch sub-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/sub-fix by this push:
new 210a483ae6a Subscription: Fixed multiple problems (#17418)
210a483ae6a is described below
commit 210a483ae6af0cd41d7317e15c96e881637f7cdf
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 2 09:41:04 2026 +0800
Subscription: Fixed multiple problems (#17418)
---
.../persistence/subscription/SubscriptionInfo.java | 51 +++++++++++-----------
.../agent/SubscriptionBrokerAgent.java | 2 +-
.../agent/SubscriptionConsumerAgent.java | 1 -
.../subscription/agent/SubscriptionTopicAgent.java | 21 +++++----
.../meta/consumer/ConsumerGroupMeta.java | 9 ++--
5 files changed, 45 insertions(+), 39 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
index 9a1c6acc72a..a2758906f5b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
@@ -220,26 +220,21 @@ public class SubscriptionInfo implements
SnapshotProcessor {
}
}
- public void validatePipePluginUsageByTopicInternal(String pipePluginName)
+ private void validatePipePluginUsageByTopicInternal(String pipePluginName)
throws SubscriptionException {
- acquireReadLock();
- try {
- topicMetaKeeper
- .getAllTopicMeta()
- .forEach(
- meta -> {
- if
(pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) {
- final String exceptionMessage =
- String.format(
- "PipePlugin '%s' is already used by Topic '%s' as a
processor.",
- pipePluginName, meta.getTopicName());
- LOGGER.warn(exceptionMessage);
- throw new SubscriptionException(exceptionMessage);
- }
- });
- } finally {
- releaseReadLock();
- }
+ topicMetaKeeper
+ .getAllTopicMeta()
+ .forEach(
+ meta -> {
+ if
(pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) {
+ final String exceptionMessage =
+ String.format(
+ "PipePlugin '%s' is already used by Topic '%s' as a
processor.",
+ pipePluginName, meta.getTopicName());
+ LOGGER.warn(exceptionMessage);
+ throw new SubscriptionException(exceptionMessage);
+ }
+ });
}
public void validateBeforeAlteringTopic(TopicMeta topicMeta) throws
SubscriptionException {
@@ -326,21 +321,25 @@ public class SubscriptionInfo implements
SnapshotProcessor {
public TSStatus alterTopic(AlterTopicPlan plan) {
acquireWriteLock();
try {
- topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName());
- topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(),
plan.getTopicMeta());
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ return alterTopicInternal(plan);
} finally {
releaseWriteLock();
}
}
- public TSStatus alterMultipleTopics(AlterMultipleTopicsPlan plan) {
+ private TSStatus alterTopicInternal(final AlterTopicPlan plan) {
+ topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName());
+ topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(),
plan.getTopicMeta());
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ public TSStatus alterMultipleTopics(final AlterMultipleTopicsPlan plan) {
acquireWriteLock();
try {
- TSStatus status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ final TSStatus status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setSubStatus(new ArrayList<>());
- for (AlterTopicPlan subPlan : plan.getSubPlans()) {
- TSStatus innerStatus = alterTopic(subPlan);
+ for (final AlterTopicPlan subPlan : plan.getSubPlans()) {
+ final TSStatus innerStatus = alterTopicInternal(subPlan);
status.getSubStatus().add(innerStatus);
if (innerStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
status.setCode(TSStatusCode.ALTER_TOPIC_ERROR.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 510f8559bc1..00007f921b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -279,7 +279,7 @@ public class SubscriptionBrokerAgent {
*/
private static class Cache<T> {
- private T value;
+ private volatile T value;
private volatile boolean valid = false;
private final Supplier<T> supplier;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
index fee23cf6af4..4ee6b191a24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
@@ -96,7 +96,6 @@ public class SubscriptionConsumerAgent {
// if consumer group meta does not exist on local agent
if (Objects.isNull(metaInAgent)) {
- consumerGroupMetaKeeper.removeConsumerGroupMeta(consumerGroupId);
consumerGroupMetaKeeper.addConsumerGroupMeta(consumerGroupId,
metaFromCoordinator);
SubscriptionAgent.broker().createBrokerIfNotExist(consumerGroupId);
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
index 4c2bf5d0217..37cdaa72690 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
@@ -149,10 +149,12 @@ public class SubscriptionTopicAgent {
public String getTopicFormat(final String topicName) {
acquireReadLock();
try {
- return topicMetaKeeper
- .getTopicMeta(topicName)
- .getConfig()
- .getStringOrDefault(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_DEFAULT_VALUE);
+ return topicMetaKeeper.containsTopicMeta(topicName)
+ ? topicMetaKeeper
+ .getTopicMeta(topicName)
+ .getConfig()
+ .getStringOrDefault(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_DEFAULT_VALUE)
+ : null;
} finally {
releaseReadLock();
}
@@ -161,10 +163,12 @@ public class SubscriptionTopicAgent {
public String getTopicMode(final String topicName) {
acquireReadLock();
try {
- return topicMetaKeeper
- .getTopicMeta(topicName)
- .getConfig()
- .getStringOrDefault(TopicConstant.MODE_KEY,
TopicConstant.MODE_DEFAULT_VALUE);
+ return topicMetaKeeper.containsTopicMeta(topicName)
+ ? topicMetaKeeper
+ .getTopicMeta(topicName)
+ .getConfig()
+ .getStringOrDefault(TopicConstant.MODE_KEY,
TopicConstant.MODE_DEFAULT_VALUE)
+ : null;
} finally {
releaseReadLock();
}
@@ -174,6 +178,7 @@ public class SubscriptionTopicAgent {
acquireReadLock();
try {
return topicNames.stream()
+ .filter(topicMetaKeeper::containsTopicMeta)
.collect(
Collectors.toMap(
topicName -> topicName,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index f8e486537f6..b316c5f155d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -33,6 +33,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -120,11 +121,13 @@ public class ConsumerGroupMeta {
public void removeConsumer(final String consumerId) {
consumerIdToConsumerMeta.remove(consumerId);
- for (final Map.Entry<String, Set<String>> entry :
- topicNameToSubscribedConsumerIdSet.entrySet()) {
+ final Iterator<Map.Entry<String, Set<String>>> iterator =
+ topicNameToSubscribedConsumerIdSet.entrySet().iterator();
+ while (iterator.hasNext()) {
+ final Map.Entry<String, Set<String>> entry = iterator.next();
entry.getValue().remove(consumerId);
if (entry.getValue().isEmpty()) {
- topicNameToSubscribedConsumerIdSet.remove(entry.getKey());
+ iterator.remove();
}
}
}