This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit dbd468bcf7022b44386968c3258f9b861e52ec5f
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Jul 29 12:19:34 2024 +0800

    Subscription: fix unexpected cancellation of workers during consumer 
startup & optimize server-side subscription logs & add synchronized modifier 
(#13032)
    
    (cherry picked from commit 99769d1a6a0a9e4aec5b5f4f0cd3d486b0d628c7)
---
 .../consumer/SubscriptionConsumer.java             |  9 ++++---
 .../consumer/SubscriptionPullConsumer.java         |  6 +++--
 .../consumer/SubscriptionPushConsumer.java         |  8 +++++-
 .../agent/SubscriptionBrokerAgent.java             |  2 ++
 .../db/subscription/broker/SubscriptionBroker.java | 31 +++++++++++++---------
 .../broker/SubscriptionPrefetchingQueue.java       |  4 +--
 .../broker/SubscriptionPrefetchingTabletQueue.java |  3 ++-
 .../broker/SubscriptionPrefetchingTsFileQueue.java |  3 ++-
 8 files changed, 42 insertions(+), 24 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 2c09fabb0c2..c1a89ded983 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -234,13 +234,14 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
       providers.releaseWriteLock();
     }
 
+    // set isClosed to false before submitting workers
+    isClosed.set(false);
+
     // submit heartbeat worker
     submitHeartbeatWorker();
 
     // submit endpoints syncer
     submitEndpointsSyncer();
-
-    isClosed.set(false);
   }
 
   @Override
@@ -1068,7 +1069,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
   /////////////////////////////// stringify ///////////////////////////////
 
   protected Map<String, String> coreReportMessage() {
-    Map<String, String> result = new HashMap<>(5);
+    final Map<String, String> result = new HashMap<>(5);
     result.put("consumerId", consumerId);
     result.put("consumerGroupId", consumerGroupId);
     result.put("isClosed", isClosed.toString());
@@ -1078,7 +1079,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
   }
 
   protected Map<String, String> allReportMessage() {
-    Map<String, String> result = new HashMap<>(10);
+    final Map<String, String> result = new HashMap<>(10);
     result.put("consumerId", consumerId);
     result.put("consumerGroupId", consumerGroupId);
     result.put("heartbeatIntervalMs", String.valueOf(heartbeatIntervalMs));
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index 31148dc09fc..37734187404 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -111,12 +111,14 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
 
     super.open();
 
+    // set isClosed to false before submitting workers
+    isClosed.set(false);
+
+    // submit auto poll worker if enabling auto commit
     if (autoCommit) {
       uncommittedMessages = new ConcurrentSkipListMap<>();
       submitAutoCommitWorker();
     }
-
-    isClosed.set(false);
   }
 
   @Override
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 5143958b89a..69cf13f9bef 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -51,6 +51,7 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
   private final AckStrategy ackStrategy;
   private final ConsumeListener consumeListener;
 
+  // avoid interval less than or equal to zero
   private final long autoPollIntervalMs;
   private final long autoPollTimeoutMs;
 
@@ -115,8 +116,12 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
     }
 
     super.open();
-    submitAutoPollWorker();
+
+    // set isClosed to false before submitting workers
     isClosed.set(false);
+
+    // submit auto poll worker
+    submitAutoPollWorker();
   }
 
   @Override
@@ -289,6 +294,7 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
     }
 
     public Builder autoPollIntervalMs(final long autoPollIntervalMs) {
+      // avoid interval less than or equal to zero
       this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1);
       return this;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 87c7edfaf8a..538004a3ed8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -116,6 +116,7 @@ public class SubscriptionBrokerAgent {
   public void createBroker(final String consumerGroupId) {
     final SubscriptionBroker broker = new SubscriptionBroker(consumerGroupId);
     consumerGroupIdToSubscriptionBroker.put(consumerGroupId, broker);
+    LOGGER.info("Subscription: create broker bound to consumer group [{}]", 
consumerGroupId);
   }
 
   /**
@@ -140,6 +141,7 @@ public class SubscriptionBrokerAgent {
       return false;
     }
     consumerGroupIdToSubscriptionBroker.remove(consumerGroupId);
+    LOGGER.info("Subscription: drop broker bound to consumer group [{}]", 
consumerGroupId);
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index b9ae1ad7197..43acf12f180 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -64,10 +64,10 @@ public class SubscriptionBroker {
       final SubscriptionPrefetchingQueue prefetchingQueue =
           topicNameToPrefetchingQueue.get(topicName);
       if (Objects.isNull(prefetchingQueue)) {
-        LOGGER.warn(
-            "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] does not exist",
-            topicName,
-            brokerId);
+        // There are two reasons for not printing logs here:
+        // 1. There will be a delay in the creation of the prefetching queue 
after subscription.
+        // 2. There is no corresponding prefetching queue on this DN 
(currently the consumer is
+        // fully connected to all DNs).
         continue;
       }
       // check if completed before closed
@@ -174,9 +174,7 @@ public class SubscriptionBroker {
 
   public void bindPrefetchingQueue(
       final String topicName, final UnboundedBlockingPendingQueue<Event> 
inputPendingQueue) {
-    final SubscriptionPrefetchingQueue prefetchingQueue =
-        topicNameToPrefetchingQueue.get(topicName);
-    if (Objects.nonNull(prefetchingQueue)) {
+    if (Objects.nonNull(topicNameToPrefetchingQueue.get(topicName))) {
       LOGGER.warn(
           "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] has already existed",
           topicName,
@@ -184,19 +182,22 @@ public class SubscriptionBroker {
       return;
     }
     final String topicFormat = 
SubscriptionAgent.topic().getTopicFormat(topicName);
+    final SubscriptionPrefetchingQueue prefetchingQueue;
     if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(topicFormat)) {
-      final SubscriptionPrefetchingQueue queue =
+      prefetchingQueue =
           new SubscriptionPrefetchingTsFileQueue(
               brokerId, topicName, new 
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
-      SubscriptionPrefetchingQueueMetrics.getInstance().register(queue);
-      topicNameToPrefetchingQueue.put(topicName, queue);
     } else {
-      final SubscriptionPrefetchingQueue queue =
+      prefetchingQueue =
           new SubscriptionPrefetchingTabletQueue(
               brokerId, topicName, new 
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
-      SubscriptionPrefetchingQueueMetrics.getInstance().register(queue);
-      topicNameToPrefetchingQueue.put(topicName, queue);
     }
+    
SubscriptionPrefetchingQueueMetrics.getInstance().register(prefetchingQueue);
+    topicNameToPrefetchingQueue.put(topicName, prefetchingQueue);
+    LOGGER.info(
+        "Subscription: create prefetching queue bound to topic [{}] for 
consumer group [{}]",
+        topicName,
+        brokerId);
   }
 
   public void unbindPrefetchingQueue(final String topicName, final boolean 
doRemove) {
@@ -230,6 +231,10 @@ public class SubscriptionBroker {
 
       // remove prefetching queue
       topicNameToPrefetchingQueue.remove(topicName);
+      LOGGER.info(
+          "Subscription: drop prefetching queue bound to topic [{}] for 
consumer group [{}]",
+          topicName,
+          brokerId);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 83549212ed6..749bcf13530 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -389,7 +389,7 @@ public abstract class SubscriptionPrefetchingQueue {
   /////////////////////////////// stringify ///////////////////////////////
 
   protected Map<String, String> coreReportMessage() {
-    Map<String, String> result = new HashMap<>(6);
+    final Map<String, String> result = new HashMap<>(6);
     result.put("brokerId", brokerId);
     result.put("topicName", topicName);
     result.put("size of uncommittedEvents", 
String.valueOf(uncommittedEvents.size()));
@@ -400,7 +400,7 @@ public abstract class SubscriptionPrefetchingQueue {
   }
 
   protected Map<String, String> allReportMessage() {
-    Map<String, String> result = new HashMap<>(8);
+    final Map<String, String> result = new HashMap<>(8);
     result.put("brokerId", brokerId);
     result.put("topicName", topicName);
     result.put("size of inputPendingQueue", 
String.valueOf(inputPendingQueue.size()));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index ddd38f50a59..1c327d2671d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -97,7 +97,8 @@ public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQ
     return onEventInternal(null);
   }
 
-  private boolean onEventInternal(@Nullable final EnrichedEvent event) {
+  // missing synchronization can cause IoTDBSubscriptionSharingIT to lose data
+  private synchronized boolean onEventInternal(@Nullable final EnrichedEvent 
event) {
     final AtomicBoolean result = new AtomicBoolean(false);
     currentBatchRef.getAndUpdate(
         (batch) -> {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 2b70ea7d56a..f0604eca157 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -327,7 +327,8 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
     return onEventInternal(null);
   }
 
-  private boolean onEventInternal(@Nullable final TabletInsertionEvent event) {
+  // missing synchronization can cause IoTDBSubscriptionSharingIT to lose data
+  private synchronized boolean onEventInternal(@Nullable final 
TabletInsertionEvent event) {
     final AtomicBoolean result = new AtomicBoolean(false);
     currentBatchRef.getAndUpdate(
         (batch) -> {

Reply via email to