This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b28a0e72f0 Subscription: stop meta sync properly when no 
topic/consumer group (#12247)
8b28a0e72f0 is described below

commit 8b28a0e72f04197220d0d3e9712905b2508a79cf
Author: Zikun Ma <[email protected]>
AuthorDate: Thu Mar 28 15:34:27 2024 +0800

    Subscription: stop meta sync properly when no topic/consumer group (#12247)
---
 .../manager/pipe/coordinator/task/PipeTaskCoordinator.java    |  4 ----
 .../manager/subscription/SubscriptionCoordinator.java         |  5 +++++
 .../subscription/AbstractOperateSubscriptionProcedure.java    | 11 +++++++++++
 3 files changed, 16 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 55388822484..35c47e9441e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -58,10 +58,6 @@ public class PipeTaskCoordinator {
     this.pipeTaskCoordinatorLock = new PipeTaskCoordinatorLock();
   }
 
-  public PipeTaskCoordinatorLock getPipeTaskCoordinatorLock() {
-    return pipeTaskCoordinatorLock;
-  }
-
   /**
    * Lock the pipe task coordinator.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 85c4bf1c097..92383f3c3b0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -110,6 +110,11 @@ public class SubscriptionCoordinator {
     subscriptionMetaSyncer.stop();
   }
 
+  /** Caller should ensure that the method is called in the lock {@link 
#tryLock}. */
+  public void updateLastSyncedVersion() {
+    subscriptionInfo.updateLastSyncedVersion();
+  }
+
   public boolean canSkipNextSync() {
     return subscriptionInfo.canSkipNextSync();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index 2e70e0e39cc..5de2b7ddda7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -28,6 +28,8 @@ import 
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import 
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime.ConsumerGroupMetaSyncProcedure;
+import 
org.apache.iotdb.confignode.procedure.impl.subscription.topic.runtime.TopicMetaSyncProcedure;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import 
org.apache.iotdb.confignode.procedure.state.subscription.OperateSubscriptionState;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
@@ -130,6 +132,15 @@ public abstract class AbstractOperateSubscriptionProcedure
           "ProcedureId {} release lock. No need to release subscription 
lock.", getProcId());
     } else {
       LOGGER.info("ProcedureId {} release lock. Subscription lock will be 
released.", getProcId());
+      if (this instanceof TopicMetaSyncProcedure
+          || this instanceof ConsumerGroupMetaSyncProcedure) {
+        LOGGER.info("Subscription meta sync procedure finished, updating last 
sync version.");
+        configNodeProcedureEnv
+            .getConfigManager()
+            .getSubscriptionManager()
+            .getSubscriptionCoordinator()
+            .updateLastSyncedVersion();
+      }
       configNodeProcedureEnv
           .getConfigManager()
           .getSubscriptionManager()

Reply via email to