This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dbd468bcf7022b44386968c3258f9b861e52ec5f Author: V_Galaxy <[email protected]> AuthorDate: Mon Jul 29 12:19:34 2024 +0800 Subscription: fix unexpected cancellation of workers during consumer startup & optimize server-side subscription logs & add synchronized modifier (#13032) (cherry picked from commit 99769d1a6a0a9e4aec5b5f4f0cd3d486b0d628c7) --- .../consumer/SubscriptionConsumer.java | 9 ++++--- .../consumer/SubscriptionPullConsumer.java | 6 +++-- .../consumer/SubscriptionPushConsumer.java | 8 +++++- .../agent/SubscriptionBrokerAgent.java | 2 ++ .../db/subscription/broker/SubscriptionBroker.java | 31 +++++++++++++--------- .../broker/SubscriptionPrefetchingQueue.java | 4 +-- .../broker/SubscriptionPrefetchingTabletQueue.java | 3 ++- .../broker/SubscriptionPrefetchingTsFileQueue.java | 3 ++- 8 files changed, 42 insertions(+), 24 deletions(-) diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java index 2c09fabb0c2..c1a89ded983 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java @@ -234,13 +234,14 @@ abstract class SubscriptionConsumer implements AutoCloseable { providers.releaseWriteLock(); } + // set isClosed to false before submitting workers + isClosed.set(false); + // submit heartbeat worker submitHeartbeatWorker(); // submit endpoints syncer submitEndpointsSyncer(); - - isClosed.set(false); } @Override @@ -1068,7 +1069,7 @@ abstract class SubscriptionConsumer implements AutoCloseable { /////////////////////////////// stringify /////////////////////////////// protected Map<String, String> coreReportMessage() { - Map<String, String> result = new HashMap<>(5); + final Map<String, String> result = new HashMap<>(5); result.put("consumerId", consumerId); result.put("consumerGroupId", consumerGroupId); result.put("isClosed", isClosed.toString()); @@ -1078,7 +1079,7 @@ abstract class SubscriptionConsumer implements AutoCloseable { } protected Map<String, String> allReportMessage() { - Map<String, String> result = new HashMap<>(10); + final Map<String, String> result = new HashMap<>(10); result.put("consumerId", consumerId); result.put("consumerGroupId", consumerGroupId); result.put("heartbeatIntervalMs", String.valueOf(heartbeatIntervalMs)); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java index 31148dc09fc..37734187404 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java @@ -111,12 +111,14 @@ public class SubscriptionPullConsumer extends SubscriptionConsumer { super.open(); + // set isClosed to false before submitting workers + isClosed.set(false); + + // submit auto poll worker if enabling auto commit if (autoCommit) { uncommittedMessages = new ConcurrentSkipListMap<>(); submitAutoCommitWorker(); } - - isClosed.set(false); } @Override diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java index 5143958b89a..69cf13f9bef 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java @@ -51,6 +51,7 @@ public class SubscriptionPushConsumer extends SubscriptionConsumer { private final AckStrategy ackStrategy; private final ConsumeListener consumeListener; + // avoid interval less than or equal to zero private final long autoPollIntervalMs; private final long autoPollTimeoutMs; @@ -115,8 +116,12 @@ public class SubscriptionPushConsumer extends SubscriptionConsumer { } super.open(); - submitAutoPollWorker(); + + // set isClosed to false before submitting workers isClosed.set(false); + + // submit auto poll worker + submitAutoPollWorker(); } @Override @@ -289,6 +294,7 @@ public class SubscriptionPushConsumer extends SubscriptionConsumer { } public Builder autoPollIntervalMs(final long autoPollIntervalMs) { + // avoid interval less than or equal to zero this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1); return this; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java index 87c7edfaf8a..538004a3ed8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java @@ -116,6 +116,7 @@ public class SubscriptionBrokerAgent { public void createBroker(final String consumerGroupId) { final SubscriptionBroker broker = new SubscriptionBroker(consumerGroupId); consumerGroupIdToSubscriptionBroker.put(consumerGroupId, broker); + LOGGER.info("Subscription: create broker bound to consumer group [{}]", consumerGroupId); } /** @@ -140,6 +141,7 @@ public class SubscriptionBrokerAgent { return false; } consumerGroupIdToSubscriptionBroker.remove(consumerGroupId); + LOGGER.info("Subscription: drop broker bound to consumer group [{}]", consumerGroupId); return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java index b9ae1ad7197..43acf12f180 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java @@ -64,10 +64,10 @@ public class SubscriptionBroker { final SubscriptionPrefetchingQueue prefetchingQueue = topicNameToPrefetchingQueue.get(topicName); if (Objects.isNull(prefetchingQueue)) { - LOGGER.warn( - "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", - topicName, - brokerId); + // There are two reasons for not printing logs here: + // 1. There will be a delay in the creation of the prefetching queue after subscription. + // 2. There is no corresponding prefetching queue on this DN (currently the consumer is + // fully connected to all DNs). continue; } // check if completed before closed @@ -174,9 +174,7 @@ public class SubscriptionBroker { public void bindPrefetchingQueue( final String topicName, final UnboundedBlockingPendingQueue<Event> inputPendingQueue) { - final SubscriptionPrefetchingQueue prefetchingQueue = - topicNameToPrefetchingQueue.get(topicName); - if (Objects.nonNull(prefetchingQueue)) { + if (Objects.nonNull(topicNameToPrefetchingQueue.get(topicName))) { LOGGER.warn( "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] has already existed", topicName, @@ -184,19 +182,22 @@ public class SubscriptionBroker { return; } final String topicFormat = SubscriptionAgent.topic().getTopicFormat(topicName); + final SubscriptionPrefetchingQueue prefetchingQueue; if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(topicFormat)) { - final SubscriptionPrefetchingQueue queue = + prefetchingQueue = new SubscriptionPrefetchingTsFileQueue( brokerId, topicName, new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue)); - SubscriptionPrefetchingQueueMetrics.getInstance().register(queue); - topicNameToPrefetchingQueue.put(topicName, queue); } else { - final SubscriptionPrefetchingQueue queue = + prefetchingQueue = new SubscriptionPrefetchingTabletQueue( brokerId, topicName, new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue)); - SubscriptionPrefetchingQueueMetrics.getInstance().register(queue); - topicNameToPrefetchingQueue.put(topicName, queue); } + SubscriptionPrefetchingQueueMetrics.getInstance().register(prefetchingQueue); + topicNameToPrefetchingQueue.put(topicName, prefetchingQueue); + LOGGER.info( + "Subscription: create prefetching queue bound to topic [{}] for consumer group [{}]", + topicName, + brokerId); } public void unbindPrefetchingQueue(final String topicName, final boolean doRemove) { @@ -230,6 +231,10 @@ public class SubscriptionBroker { // remove prefetching queue topicNameToPrefetchingQueue.remove(topicName); + LOGGER.info( + "Subscription: drop prefetching queue bound to topic [{}] for consumer group [{}]", + topicName, + brokerId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 83549212ed6..749bcf13530 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -389,7 +389,7 @@ public abstract class SubscriptionPrefetchingQueue { /////////////////////////////// stringify /////////////////////////////// protected Map<String, String> coreReportMessage() { - Map<String, String> result = new HashMap<>(6); + final Map<String, String> result = new HashMap<>(6); result.put("brokerId", brokerId); result.put("topicName", topicName); result.put("size of uncommittedEvents", String.valueOf(uncommittedEvents.size())); @@ -400,7 +400,7 @@ public abstract class SubscriptionPrefetchingQueue { } protected Map<String, String> allReportMessage() { - Map<String, String> result = new HashMap<>(8); + final Map<String, String> result = new HashMap<>(8); result.put("brokerId", brokerId); result.put("topicName", topicName); result.put("size of inputPendingQueue", String.valueOf(inputPendingQueue.size())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java index ddd38f50a59..1c327d2671d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java @@ -97,7 +97,8 @@ public class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQ return onEventInternal(null); } - private boolean onEventInternal(@Nullable final EnrichedEvent event) { + // missing synchronization can cause IoTDBSubscriptionSharingIT to lose data + private synchronized boolean onEventInternal(@Nullable final EnrichedEvent event) { final AtomicBoolean result = new AtomicBoolean(false); currentBatchRef.getAndUpdate( (batch) -> { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index 2b70ea7d56a..f0604eca157 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -327,7 +327,8 @@ public class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQ return onEventInternal(null); } - private boolean onEventInternal(@Nullable final TabletInsertionEvent event) { + // missing synchronization can cause IoTDBSubscriptionSharingIT to lose data + private synchronized boolean onEventInternal(@Nullable final TabletInsertionEvent event) { final AtomicBoolean result = new AtomicBoolean(false); currentBatchRef.getAndUpdate( (batch) -> {
