This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d05871213adc351d4c718c2a6fb0909b01d279ff Author: Cong Zhao <[email protected]> AuthorDate: Sun Apr 16 16:05:44 2023 +0800 [fix][broker] Ensure previous delayed index be removed from snapshotSegmentLastIndexTable & Make load operate asynchronous (#20086) (cherry picked from commit c4aec6661e795c46181dc1fa79282065fa875768) --- .../bucket/BucketDelayedDeliveryTracker.java | 106 ++++++++++++--------- .../PersistentDispatcherMultipleConsumers.java | 13 +-- .../bucket/BucketDelayedDeliveryTrackerTest.java | 30 ++++-- 3 files changed, 85 insertions(+), 64 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 6ead1e207b0..6678c6df254 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -21,7 +21,6 @@ package org.apache.pulsar.broker.delayed.bucket; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX; import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER; -import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Range; @@ -84,7 +83,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker private final int maxNumBuckets; - private long numberDelayedMessages; + private volatile long numberDelayedMessages; @Getter @VisibleForTesting @@ -102,6 +101,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker private final BucketDelayedMessageIndexStats stats; + private CompletableFuture<Void> pendingLoad = null; + public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict, @@ -269,7 +270,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker if (ex == null) { immutableBucket.setSnapshotSegments(null); immutableBucket.asyncUpdateSnapshotLength(); - log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(), + log.info("[{}] Create bucket snapshot finish, bucketKey: {}", dispatcher.getName(), immutableBucket.bucketKey()); stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create, @@ -529,17 +530,25 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker } @Override - public synchronized long getNumberOfDelayedMessages() { + public long getNumberOfDelayedMessages() { return numberDelayedMessages; } @Override - public synchronized long getBufferMemoryUsage() { + public long getBufferMemoryUsage() { return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity(); } @Override public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) { + if (!checkPendingOpDone()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.", + dispatcher.getName()); + } + return Collections.emptyNavigableSet(); + } + long cutoffTime = getCutoffTime(); lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue); @@ -558,6 +567,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId); if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { + // All message of current snapshot segment are scheduled, try load next snapshot segment if (bucket.merging) { log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}", dispatcher.getName(), bucket.bucketKey()); @@ -569,26 +579,19 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); } - // All message of current snapshot segment are scheduled, load next snapshot segment - // TODO make it asynchronous and not blocking this process - try { - boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone(); - - if (!createFutureDone) { - log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", - dispatcher.getName(), bucket.bucketKey()); - break; - } - - if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) { - immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); - bucket.asyncDeleteBucketSnapshot(stats); - continue; - } + boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone(); + if (!createFutureDone) { + log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", + dispatcher.getName(), bucket.bucketKey()); + break; + } - long loadStartTime = System.currentTimeMillis(); - stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); - bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> { + long loadStartTime = System.currentTimeMillis(); + stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); + CompletableFuture<Void> loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry() + .thenAccept(indexList -> { + synchronized (BucketDelayedDeliveryTracker.this) { + this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); if (CollectionUtils.isEmpty(indexList)) { immutableBuckets.asMapOfRanges() .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); @@ -603,31 +606,36 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), index.getEntryId()); } - }).whenComplete((__, ex) -> { - if (ex != null) { - // Back bucket state - bucket.setCurrentSegmentEntryId(preSegmentEntryId); - - log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}", - dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex); - - stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load); - } else { - log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", - dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); - - stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, - System.currentTimeMillis() - loadStartTime); + } + }).whenComplete((__, ex) -> { + if (ex != null) { + // Back bucket state + bucket.setCurrentSegmentEntryId(preSegmentEntryId); + + log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}", + dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex); + + stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load); + } else { + log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", + dispatcher.getName(), bucket.bucketKey(), + (preSegmentEntryId == bucket.lastSegmentEntryId) ? "-1" : preSegmentEntryId + 1); + + stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, + System.currentTimeMillis() - loadStartTime); + } + synchronized (this) { + if (timeout != null) { + timeout.cancel(); } - }).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1), TimeUnit.SECONDS); - } catch (Exception e) { - // Ignore exception to reload this segment on the next schedule. - log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{}", - dispatcher.getName(), bucket.bucketKey(), e); + timeout = timer.newTimeout(this, tickTimeMillis, TimeUnit.MILLISECONDS); + } + }); + + if (!checkPendingOpDone() || loadFuture.isCompletedExceptionally()) { break; } } - snapshotSegmentLastIndexTable.remove(ledgerId, entryId); positions.add(new PositionImpl(ledgerId, entryId)); @@ -643,6 +651,14 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker return positions; } + private synchronized boolean checkPendingOpDone() { + if (pendingLoad == null || pendingLoad.isDone()) { + pendingLoad = null; + return true; + } + return false; + } + @Override public boolean shouldPauseAllDeliveries() { return false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index c60b4562bf1..81adda053e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -46,7 +46,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; @@ -1089,7 +1088,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } @Override - public synchronized long getNumberOfDelayedMessages() { + public long getNumberOfDelayedMessages() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); } @@ -1169,15 +1168,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul public long getDelayedTrackerMemoryUsage() { - if (delayedDeliveryTracker.isEmpty()) { - return 0; - } - - if (delayedDeliveryTracker.get() instanceof AbstractDelayedDeliveryTracker) { - return delayedDeliveryTracker.get().getBufferMemoryUsage(); - } - - return 0; + return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L); } public Map<String, TopicMetricBean> getBucketDelayedIndexStats() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index 95234d688f6..39b3992fbd1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -39,6 +39,7 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -197,9 +198,11 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes }); assertTrue(tracker.hasMessageAvailable()); - Set<PositionImpl> scheduledMessages = tracker.getScheduledMessages(100); - - assertEquals(scheduledMessages.size(), 1); + Set<PositionImpl> scheduledMessages = new TreeSet<>(); + Awaitility.await().untilAsserted(() -> { + scheduledMessages.addAll(tracker.getScheduledMessages(100)); + assertEquals(scheduledMessages.size(), 1); + }); tracker.addMessage(101, 101, 101 * 10); @@ -216,12 +219,15 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes clockTime.set(100 * 10); assertTrue(tracker2.hasMessageAvailable()); - scheduledMessages = tracker2.getScheduledMessages(70); + Set<PositionImpl> scheduledMessages2 = new TreeSet<>(); - assertEquals(scheduledMessages.size(), 70); + Awaitility.await().untilAsserted(() -> { + scheduledMessages2.addAll(tracker2.getScheduledMessages(70)); + assertEquals(scheduledMessages2.size(), 70); + }); int i = 31; - for (PositionImpl scheduledMessage : scheduledMessages) { + for (PositionImpl scheduledMessage : scheduledMessages2) { assertEquals(scheduledMessage, PositionImpl.get(i, i)); i++; } @@ -298,7 +304,11 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes clockTime.set(110 * 10); - NavigableSet<PositionImpl> scheduledMessages = tracker2.getScheduledMessages(110); + NavigableSet<PositionImpl> scheduledMessages = new TreeSet<>(); + Awaitility.await().untilAsserted(() -> { + scheduledMessages.addAll(tracker2.getScheduledMessages(110)); + assertEquals(scheduledMessages.size(), 110); + }); for (int i = 1; i <= 110; i++) { PositionImpl position = scheduledMessages.pollFirst(); assertEquals(position, PositionImpl.get(i, i)); @@ -370,7 +380,11 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes assertEquals(tracker2.getScheduledMessages(100).size(), 0); - assertEquals(tracker2.getScheduledMessages(100).size(), delayedMessagesInSnapshotValue); + Set<PositionImpl> scheduledMessages = new TreeSet<>(); + Awaitility.await().untilAsserted(() -> { + scheduledMessages.addAll(tracker2.getScheduledMessages(100)); + assertEquals(scheduledMessages.size(), delayedMessagesInSnapshotValue); + }); assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty()); assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty());
