This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 0e9ee216bd5 Subscription: Fixed multiple bugs (#17563) (#17599)
0e9ee216bd5 is described below
commit 0e9ee216bd5c5b8c3c74e6ec3b9b36de6e907e08
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 7 09:34:46 2026 +0800
Subscription: Fixed multiple bugs (#17563) (#17599)
---
.../subscription/CreateSubscriptionProcedure.java | 6 +-
.../subscription/DropSubscriptionProcedure.java | 6 +-
.../CreateSubscriptionProcedureTest.java | 134 +++++++++++++++++++++
.../DropSubscriptionProcedureTest.java | 99 +++++++++++++++
.../meta/consumer/ConsumerGroupMeta.java | 7 +-
.../consumer/ConsumerGroupDeSerTest.java | 20 +++
6 files changed, 266 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 80e4e511554..6d468908ed7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -87,6 +87,9 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
throws SubscriptionException {
LOGGER.info("CreateSubscriptionProcedure: executeFromValidate");
+ alterConsumerGroupProcedure = null;
+ createPipeProcedures = new ArrayList<>();
+
subscriptionInfo.get().validateBeforeSubscribe(subscribeReq);
// Construct AlterConsumerGroupProcedure
@@ -160,8 +163,7 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
response = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
}
- if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && response.getSubStatusSize() > 0) {
+ if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new SubscriptionException(
String.format(
"Failed to create subscription with request %s on config nodes,
because %s",
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
index 6741a6c1e2a..6f668f29c5d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
@@ -85,6 +85,9 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
throws SubscriptionException {
LOGGER.info("DropSubscriptionProcedure: executeFromValidate");
+ alterConsumerGroupProcedure = null;
+ dropPipeProcedures = new ArrayList<>();
+
subscriptionInfo.get().validateBeforeUnsubscribe(unsubscribeReq);
// Construct AlterConsumerGroupProcedure
@@ -141,8 +144,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
response = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
}
- if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && response.getSubStatusSize() > 0) {
+ if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new SubscriptionException(
String.format(
"Failed to drop subscription with request %s on config nodes,
because %s",
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java
index 93d9941fbf3..e2a4d0615d8 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java
@@ -19,18 +19,36 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.subscription;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta;
+import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.PermissionManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
+import
org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator;
+import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import
org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
import
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.AlterConsumerGroupProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.tsfile.utils.PublicBAOS;
+import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.DataOutputStream;
+import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -39,6 +57,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -102,4 +121,119 @@ public class CreateSubscriptionProcedureTest {
fail();
}
}
+
+ @Test
+ public void
executeFromOperateOnConfigNodesShouldFailOnTopLevelConsensusError() throws
Exception {
+ final CreateSubscriptionProcedure proc =
+ new CreateSubscriptionProcedure(
+ new TSubscribeReq(
+ "old_consumer", "test_consumer_group",
Collections.singleton("test_topic")));
+
proc.setAlterConsumerGroupProcedure(Mockito.mock(AlterConsumerGroupProcedure.class));
+
+ final CreatePipeProcedureV2 createPipeProcedure =
Mockito.mock(CreatePipeProcedureV2.class);
+ Mockito.when(createPipeProcedure.constructPlan())
+ .thenReturn(Mockito.mock(CreatePipePlanV2.class));
+
proc.setCreatePipeProcedures(Collections.singletonList(createPipeProcedure));
+
+ try {
+ proc.executeFromOperateOnConfigNodes(
+ mockConsensusFailureEnv(
+ new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage("consensus write failed")));
+ fail();
+ } catch (SubscriptionException e) {
+ Assert.assertTrue(e.getMessage().contains("Failed to create
subscription"));
+ }
+ }
+
+ @Test
+ public void executeFromValidateShouldResetCreatePipeProceduresOnRetry()
throws Exception {
+ final Map<String, String> consumerAttributes = new HashMap<>();
+ consumerAttributes.put("username", "user");
+ consumerAttributes.put("password", "password");
+
+ final ConsumerGroupMeta consumerGroupMeta =
+ new ConsumerGroupMeta(
+ "test_consumer_group", 1, new ConsumerMeta("old_consumer", 1,
consumerAttributes));
+ final TopicMeta topicMeta = new TopicMeta("test_topic", 1,
Collections.emptyMap());
+
+ final SubscriptionInfo subscriptionInfo =
Mockito.mock(SubscriptionInfo.class);
+ Mockito.when(subscriptionInfo.getConsumerGroupMeta("test_consumer_group"))
+ .thenReturn(consumerGroupMeta);
+
Mockito.when(subscriptionInfo.deepCopyConsumerGroupMeta("test_consumer_group"))
+ .thenAnswer(invocation -> consumerGroupMeta.deepCopy());
+ Mockito.when(
+ subscriptionInfo.isTopicSubscribedByConsumerGroup("test_topic",
"test_consumer_group"))
+ .thenReturn(false);
+
Mockito.when(subscriptionInfo.deepCopyTopicMeta("test_topic")).thenReturn(topicMeta);
+
+ final PipeTaskInfo pipeTaskInfo = Mockito.mock(PipeTaskInfo.class);
+
Mockito.when(pipeTaskInfo.checkBeforeCreatePipe(Mockito.any(TCreatePipeReq.class)))
+ .thenReturn(true);
+
+ final CreateSubscriptionProcedure proc =
+ new CreateSubscriptionProcedure(
+ new TSubscribeReq(
+ "old_consumer", "test_consumer_group",
Collections.singleton("test_topic")));
+ setField(proc, "subscriptionInfo", new
AtomicReference<>(subscriptionInfo));
+ setField(proc, "pipeTaskInfo", new AtomicReference<>(pipeTaskInfo));
+
+ final ConfigNodeProcedureEnv env = mockCreateSubscriptionValidationEnv();
+ proc.executeFromValidate(env);
+ Assert.assertEquals(1, proc.getCreatePipeProcedures().size());
+
+ proc.executeFromValidate(env);
+ Assert.assertEquals(1, proc.getCreatePipeProcedures().size());
+ }
+
+ private static ConfigNodeProcedureEnv mockConsensusFailureEnv(final TSStatus
response)
+ throws Exception {
+ final ConfigNodeProcedureEnv env =
Mockito.mock(ConfigNodeProcedureEnv.class);
+ final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ final ConsensusManager consensusManager =
Mockito.mock(ConsensusManager.class);
+
+ Mockito.when(env.getConfigManager()).thenReturn(configManager);
+
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+ Mockito.when(consensusManager.write(Mockito.any())).thenReturn(response);
+
+ return env;
+ }
+
+ private static ConfigNodeProcedureEnv mockCreateSubscriptionValidationEnv() {
+ final ConfigNodeProcedureEnv env =
Mockito.mock(ConfigNodeProcedureEnv.class);
+ final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ final PermissionManager permissionManager =
Mockito.mock(PermissionManager.class);
+ final PipeManager pipeManager = Mockito.mock(PipeManager.class);
+ final PipePluginCoordinator pipePluginCoordinator =
Mockito.mock(PipePluginCoordinator.class);
+ final PipePluginInfo pipePluginInfo = Mockito.mock(PipePluginInfo.class);
+ final LoadManager loadManager = Mockito.mock(LoadManager.class);
+
+ Mockito.when(env.getConfigManager()).thenReturn(configManager);
+
Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager);
+ Mockito.when(configManager.getPipeManager()).thenReturn(pipeManager);
+
Mockito.when(pipeManager.getPipePluginCoordinator()).thenReturn(pipePluginCoordinator);
+
Mockito.when(pipePluginCoordinator.getPipePluginInfo()).thenReturn(pipePluginInfo);
+ Mockito.when(configManager.getLoadManager()).thenReturn(loadManager);
+
Mockito.when(loadManager.getRegionLeaderMap()).thenReturn(Collections.emptyMap());
+ Mockito.when(permissionManager.login4Pipe(Mockito.anyString(),
Mockito.any()))
+ .thenReturn("hashedPassword");
+
+ return env;
+ }
+
+ private static void setField(final Object target, final String fieldName,
final Object value)
+ throws Exception {
+ Class<?> clazz = target.getClass();
+ while (clazz != null) {
+ try {
+ final Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ return;
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ }
+ }
+ throw new NoSuchFieldException(fieldName);
+ }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java
index 9ecce2a522c..910648bbe51 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java
@@ -19,24 +19,37 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.subscription;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import
org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
import
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.AlterConsumerGroupProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.tsfile.utils.PublicBAOS;
+import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.DataOutputStream;
+import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -91,4 +104,90 @@ public class DropSubscriptionProcedureTest {
fail();
}
}
+
+ @Test
+ public void
executeFromOperateOnConfigNodesShouldFailOnTopLevelConsensusError() throws
Exception {
+ final DropSubscriptionProcedure proc =
+ new DropSubscriptionProcedure(
+ new TUnsubscribeReq(
+ "old_consumer", "test_consumer_group",
Collections.singleton("test_topic")));
+
proc.setAlterConsumerGroupProcedure(Mockito.mock(AlterConsumerGroupProcedure.class));
+
+ final DropPipeProcedureV2 dropPipeProcedure =
Mockito.mock(DropPipeProcedureV2.class);
+ Mockito.when(dropPipeProcedure.getPipeName()).thenReturn("pipe_topic");
+ proc.setDropPipeProcedures(Collections.singletonList(dropPipeProcedure));
+
+ try {
+ proc.executeFromOperateOnConfigNodes(
+ mockConsensusFailureEnv(
+ new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage("consensus write failed")));
+ fail();
+ } catch (SubscriptionException e) {
+ Assert.assertTrue(e.getMessage().contains("Failed to drop
subscription"));
+ }
+ }
+
+ @Test
+ public void executeFromValidateShouldResetDropPipeProceduresOnRetry() throws
Exception {
+ final Map<String, String> consumerAttributes = new HashMap<>();
+ consumerAttributes.put("username", "user");
+ consumerAttributes.put("password", "password");
+
+ final ConsumerGroupMeta consumerGroupMeta =
+ new ConsumerGroupMeta(
+ "test_consumer_group", 1, new ConsumerMeta("old_consumer", 1,
consumerAttributes));
+ consumerGroupMeta.addSubscription("old_consumer",
Collections.singleton("test_topic"));
+
+ final SubscriptionInfo subscriptionInfo =
Mockito.mock(SubscriptionInfo.class);
+ Mockito.when(subscriptionInfo.getConsumerGroupMeta("test_consumer_group"))
+ .thenReturn(consumerGroupMeta);
+
Mockito.when(subscriptionInfo.deepCopyConsumerGroupMeta("test_consumer_group"))
+ .thenAnswer(invocation -> consumerGroupMeta.deepCopy());
+
+ final PipeTaskInfo pipeTaskInfo = Mockito.mock(PipeTaskInfo.class);
+
+ final DropSubscriptionProcedure proc =
+ new DropSubscriptionProcedure(
+ new TUnsubscribeReq(
+ "old_consumer", "test_consumer_group",
Collections.singleton("test_topic")));
+ setField(proc, "subscriptionInfo", new
AtomicReference<>(subscriptionInfo));
+ setField(proc, "pipeTaskInfo", new AtomicReference<>(pipeTaskInfo));
+
+ final ConfigNodeProcedureEnv env =
Mockito.mock(ConfigNodeProcedureEnv.class);
+ proc.executeFromValidate(env);
+ Assert.assertEquals(1, proc.getDropPipeProcedures().size());
+
+ proc.executeFromValidate(env);
+ Assert.assertEquals(1, proc.getDropPipeProcedures().size());
+ }
+
+ private static ConfigNodeProcedureEnv mockConsensusFailureEnv(final TSStatus
response)
+ throws Exception {
+ final ConfigNodeProcedureEnv env =
Mockito.mock(ConfigNodeProcedureEnv.class);
+ final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ final ConsensusManager consensusManager =
Mockito.mock(ConsensusManager.class);
+
+ Mockito.when(env.getConfigManager()).thenReturn(configManager);
+
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+ Mockito.when(consensusManager.write(Mockito.any())).thenReturn(response);
+
+ return env;
+ }
+
+ private static void setField(final Object target, final String fieldName,
final Object value)
+ throws Exception {
+ Class<?> clazz = target.getClass();
+ while (clazz != null) {
+ try {
+ final Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ return;
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ }
+ }
+ throw new NoSuchFieldException(fieldName);
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index 498f3427690..640d154937b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -69,8 +69,11 @@ public class ConsumerGroupMeta {
final ConsumerGroupMeta copied = new ConsumerGroupMeta();
copied.consumerGroupId = consumerGroupId;
copied.creationTime = creationTime;
- copied.topicNameToSubscribedConsumerIdSet =
- new ConcurrentHashMap<>(topicNameToSubscribedConsumerIdSet);
+ copied.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>();
+ topicNameToSubscribedConsumerIdSet.forEach(
+ (topicName, subscribedConsumerIds) ->
+ copied.topicNameToSubscribedConsumerIdSet.put(
+ topicName, new HashSet<>(subscribedConsumerIds)));
copied.consumerIdToConsumerMeta = new
ConcurrentHashMap<>(consumerIdToConsumerMeta);
copied.topicNameToSubscriptionCreationTime =
new ConcurrentHashMap<>(topicNameToSubscriptionCreationTime);
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java
index d41b3706cd1..9ef7191ba66 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java
@@ -69,4 +69,24 @@ public class ConsumerGroupDeSerTest {
consumerGroupMeta.getConsumerGroupId(),
consumerGroupMeta2.getConsumerGroupId());
Assert.assertEquals(consumerGroupMeta.getCreationTime(),
consumerGroupMeta2.getCreationTime());
}
+
+ @Test
+ public void testDeepCopyShouldNotShareSubscribedConsumerSets() {
+ Map<String, String> consumerAttributes = new HashMap<>();
+ consumerAttributes.put("username", "user");
+ consumerAttributes.put("password", "password");
+
+ ConsumerGroupMeta consumerGroupMeta =
+ new ConsumerGroupMeta(
+ "test_consumer_group", 1, new ConsumerMeta("test_consumer1", 1,
consumerAttributes));
+ consumerGroupMeta.addSubscription("test_consumer1",
Collections.singleton("test_topic"));
+
+ ConsumerGroupMeta copiedConsumerGroupMeta = consumerGroupMeta.deepCopy();
+ copiedConsumerGroupMeta.removeSubscription(
+ "test_consumer1", Collections.singleton("test_topic"));
+
+ Assert.assertTrue(
+
consumerGroupMeta.getConsumersSubscribingTopic("test_topic").contains("test_consumer1"));
+
Assert.assertTrue(copiedConsumerGroupMeta.getConsumersSubscribingTopic("test_topic").isEmpty());
+ }
}