This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8b28a0e72f0 Subscription: stop meta sync properly when no
topic/consumer group (#12247)
8b28a0e72f0 is described below
commit 8b28a0e72f04197220d0d3e9712905b2508a79cf
Author: Zikun Ma <[email protected]>
AuthorDate: Thu Mar 28 15:34:27 2024 +0800
Subscription: stop meta sync properly when no topic/consumer group (#12247)
---
.../manager/pipe/coordinator/task/PipeTaskCoordinator.java | 4 ----
.../manager/subscription/SubscriptionCoordinator.java | 5 +++++
.../subscription/AbstractOperateSubscriptionProcedure.java | 11 +++++++++++
3 files changed, 16 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 55388822484..35c47e9441e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -58,10 +58,6 @@ public class PipeTaskCoordinator {
this.pipeTaskCoordinatorLock = new PipeTaskCoordinatorLock();
}
- public PipeTaskCoordinatorLock getPipeTaskCoordinatorLock() {
- return pipeTaskCoordinatorLock;
- }
-
/**
* Lock the pipe task coordinator.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 85c4bf1c097..92383f3c3b0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -110,6 +110,11 @@ public class SubscriptionCoordinator {
subscriptionMetaSyncer.stop();
}
+ /** Caller should ensure that the method is called in the lock {@link
#tryLock}. */
+ public void updateLastSyncedVersion() {
+ subscriptionInfo.updateLastSyncedVersion();
+ }
+
public boolean canSkipNextSync() {
return subscriptionInfo.canSkipNextSync();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index 2e70e0e39cc..5de2b7ddda7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -28,6 +28,8 @@ import
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime.ConsumerGroupMetaSyncProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.subscription.topic.runtime.TopicMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import
org.apache.iotdb.confignode.procedure.state.subscription.OperateSubscriptionState;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
@@ -130,6 +132,15 @@ public abstract class AbstractOperateSubscriptionProcedure
"ProcedureId {} release lock. No need to release subscription
lock.", getProcId());
} else {
LOGGER.info("ProcedureId {} release lock. Subscription lock will be
released.", getProcId());
+ if (this instanceof TopicMetaSyncProcedure
+ || this instanceof ConsumerGroupMetaSyncProcedure) {
+ LOGGER.info("Subscription meta sync procedure finished, updating last
sync version.");
+ configNodeProcedureEnv
+ .getConfigManager()
+ .getSubscriptionManager()
+ .getSubscriptionCoordinator()
+ .updateLastSyncedVersion();
+ }
configNodeProcedureEnv
.getConfigManager()
.getSubscriptionManager()