Copilot commented on code in PR #24927: URL: https://github.com/apache/pulsar/pull/24927#discussion_r2483706386
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java: ########## @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.roaringbitmap.longlong.LongIterator; +import org.roaringbitmap.longlong.Roaring64Bitmap; + +/** + * In-memory implementation of topic-level delayed delivery tracker manager. + * This manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, significantly reducing memory usage in multi-subscription scenarios. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedDeliveryTrackerManager, TimerTask { + + // Global delayed message index: timestamp -> ledgerId -> entryId bitmap + // Outer: sorted by timestamp for efficient finding of earliest bucket + // Inner: per-ledger bitmaps of entry-ids + private final ConcurrentSkipListMap<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> delayedMessageMap = + new ConcurrentSkipListMap<>(); + + // Subscription registry: subscription name -> subscription context + private final ConcurrentHashMap<String, SubContext> subscriptionContexts = new ConcurrentHashMap<>(); + + // Timer for delayed delivery + private final Timer timer; + private Timeout timeout; + private long currentTimeoutTarget = -1; + // Last time the TimerTask was triggered + private long lastTickRun = 0L; + + // Configuration + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + + // Statistics + private final AtomicLong delayedMessagesCount = new AtomicLong(0); + private final AtomicLong bufferMemoryBytes = new AtomicLong(0); + + // Prune throttling + // Last pruning time + private final AtomicLong lastPruneNanos = new AtomicLong(0); + // Minimum interval between prunes + private final long minPruneIntervalNanos; + + // Fixed-delay detection (parity with legacy behavior) + private final AtomicLong highestDeliveryTimeTracked = new AtomicLong(0); + private volatile boolean messagesHaveFixedDelay = true; + + // Per-bucket locks (timestamp -> lock) for fine-grained concurrency + private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new ConcurrentHashMap<>(); + + // Timer state guard + private final ReentrantLock timerLock = new ReentrantLock(); + + /** + * Subscription context that holds per-subscription state. + */ + @Getter + static class SubContext { + private final AbstractPersistentDispatcherMultipleConsumers dispatcher; + private final String subscriptionName; + private volatile long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + private volatile Position markDeletePosition; + + SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead, + Clock clock) { + this.dispatcher = dispatcher; + this.subscriptionName = dispatcher.getSubscription().getName(); + this.tickTimeMillis = tickTimeMillis; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.clock = clock; + } + + void updateMarkDeletePosition(Position position) { + this.markDeletePosition = position; + } + + long getCutoffTime() { + long now = clock.millis(); + return isDelayedDeliveryDeliverAtTimeStrict ? now : now + tickTimeMillis; + } + } + + private final Runnable onEmptyCallback; + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead, + Runnable onEmptyCallback) { + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.clock = clock; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.onEmptyCallback = onEmptyCallback; + // Default prune throttle interval: clamp to [5ms, 50ms] using tickTimeMillis as hint + // TODO: make configurable if needed Review Comment: [nitpick] The TODO comment suggests this may need to be configurable in the future. Consider whether this configuration should be added now if it's a critical performance parameter, or create a tracking issue for future work. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java: ########## @@ -66,13 +97,35 @@ public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleCon } @VisibleForTesting - InMemoryDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) { - return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, - isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead); + DelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String topicName = dispatcher.getTopic().getName(); + + // Get or create topic-level manager for this topic with onEmpty callback to remove from cache + final TopicDelayedDeliveryTrackerManager[] holder = new TopicDelayedDeliveryTrackerManager[1]; + TopicDelayedDeliveryTrackerManager manager = topicManagers.computeIfAbsent(topicName, k -> { + InMemoryTopicDelayedDeliveryTrackerManager m = new InMemoryTopicDelayedDeliveryTrackerManager( + timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, () -> topicManagers.remove(topicName, holder[0])); + holder[0] = m; + return m; + }); Review Comment: The pattern using 'holder[0]' to capture the newly created manager for the onEmpty callback is unnecessarily complex and error-prone. Consider computing the callback outside the lambda or storing the manager reference after creation and then registering the callback. ```suggestion TopicDelayedDeliveryTrackerManager manager = topicManagers.get(topicName); if (manager == null) { InMemoryTopicDelayedDeliveryTrackerManager newManager = new InMemoryTopicDelayedDeliveryTrackerManager( timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, () -> topicManagers.remove(topicName, newManager)); TopicDelayedDeliveryTrackerManager existing = topicManagers.putIfAbsent(topicName, newManager); manager = (existing == null) ? newManager : existing; } ``` ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java: ########## @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.roaringbitmap.longlong.LongIterator; +import org.roaringbitmap.longlong.Roaring64Bitmap; + +/** + * In-memory implementation of topic-level delayed delivery tracker manager. + * This manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, significantly reducing memory usage in multi-subscription scenarios. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedDeliveryTrackerManager, TimerTask { + + // Global delayed message index: timestamp -> ledgerId -> entryId bitmap + // Outer: sorted by timestamp for efficient finding of earliest bucket + // Inner: per-ledger bitmaps of entry-ids + private final ConcurrentSkipListMap<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> delayedMessageMap = + new ConcurrentSkipListMap<>(); + + // Subscription registry: subscription name -> subscription context + private final ConcurrentHashMap<String, SubContext> subscriptionContexts = new ConcurrentHashMap<>(); + + // Timer for delayed delivery + private final Timer timer; + private Timeout timeout; + private long currentTimeoutTarget = -1; + // Last time the TimerTask was triggered + private long lastTickRun = 0L; + + // Configuration + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + + // Statistics + private final AtomicLong delayedMessagesCount = new AtomicLong(0); + private final AtomicLong bufferMemoryBytes = new AtomicLong(0); + + // Prune throttling + // Last pruning time + private final AtomicLong lastPruneNanos = new AtomicLong(0); + // Minimum interval between prunes + private final long minPruneIntervalNanos; + + // Fixed-delay detection (parity with legacy behavior) + private final AtomicLong highestDeliveryTimeTracked = new AtomicLong(0); + private volatile boolean messagesHaveFixedDelay = true; + + // Per-bucket locks (timestamp -> lock) for fine-grained concurrency + private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new ConcurrentHashMap<>(); + + // Timer state guard + private final ReentrantLock timerLock = new ReentrantLock(); + + /** + * Subscription context that holds per-subscription state. + */ + @Getter + static class SubContext { + private final AbstractPersistentDispatcherMultipleConsumers dispatcher; + private final String subscriptionName; + private volatile long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + private volatile Position markDeletePosition; + + SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead, + Clock clock) { + this.dispatcher = dispatcher; + this.subscriptionName = dispatcher.getSubscription().getName(); + this.tickTimeMillis = tickTimeMillis; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.clock = clock; + } + + void updateMarkDeletePosition(Position position) { + this.markDeletePosition = position; + } + + long getCutoffTime() { + long now = clock.millis(); + return isDelayedDeliveryDeliverAtTimeStrict ? now : now + tickTimeMillis; + } + } + + private final Runnable onEmptyCallback; + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead, + Runnable onEmptyCallback) { + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.clock = clock; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.onEmptyCallback = onEmptyCallback; + // Default prune throttle interval: clamp to [5ms, 50ms] using tickTimeMillis as hint + // TODO: make configurable if needed + long pruneMs = Math.max(5L, Math.min(50L, tickTimeMillis)); + this.minPruneIntervalNanos = TimeUnit.MILLISECONDS.toNanos(pruneMs); + } + + // We bucket messages by aligning the deliverAt timestamp to the start of the logical tick window: + // bucketStart = deliverAt - (deliverAt % tickTimeMillis) + // If tickTimeMillis changes over time, the same message may land in different buckets when re-added + // by another subscription. Read paths dedup via TreeSet and counts include duplicates by design. + private long bucketStart(long timestamp) { + long t = this.tickTimeMillis; + if (t <= 0) { + return timestamp; + } + long mod = timestamp % t; + if (mod == 0) { + return timestamp; + } + return timestamp - mod; + } + + @Override + public DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + SubContext subContext = subscriptionContexts.computeIfAbsent(subscriptionName, + k -> new SubContext(dispatcher, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, clock)); + return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext); + } + + @Override + public void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + subscriptionContexts.remove(subscriptionName); + // If no more subscriptions, proactively free index and release memory + if (subscriptionContexts.isEmpty()) { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + if (onEmptyCallback != null) { + try { + onEmptyCallback.run(); + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug("onEmptyCallback failed", t); + } + } + } + } + } + + @Override + public void onTickTimeUpdated(long newTickTimeMillis) { + if (this.tickTimeMillis == newTickTimeMillis) { + return; + } + this.tickTimeMillis = newTickTimeMillis; + // Propagate to all subscriptions + for (SubContext sc : subscriptionContexts.values()) { + sc.tickTimeMillis = newTickTimeMillis; + } + // Re-evaluate timer scheduling with new tick time + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + if (log.isDebugEnabled()) { + log.debug("Updated tickTimeMillis for topic-level delayed delivery manager to {} ms", newTickTimeMillis); + } + } + + @Override + public long topicBufferMemoryBytes() { + return bufferMemoryBytes.get(); + } + + @Override + public long topicDelayedMessages() { + return delayedMessagesCount.get(); + } + + @Override + public void close() { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + subscriptionContexts.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + } + + /** + * Add a message to the global delayed message index. + */ + boolean addMessageForSub(SubContext subContext, long ledgerId, long entryId, long deliverAt) { + if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) { + return false; + } + + long timestamp = bucketStart(deliverAt); + ReentrantLock bLock = bucketLocks.computeIfAbsent(timestamp, k -> new ReentrantLock()); + bLock.lock(); + try { + Long2ObjectRBTreeMap<Roaring64Bitmap> ledgerMap = + delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()); + Roaring64Bitmap entryIds = ledgerMap.get(ledgerId); + if (entryIds == null) { + entryIds = new Roaring64Bitmap(); + ledgerMap.put(ledgerId, entryIds); + } + long before = entryIds.getLongSizeInBytes(); + if (!entryIds.contains(entryId)) { + entryIds.add(entryId); + delayedMessagesCount.incrementAndGet(); + long after = entryIds.getLongSizeInBytes(); + bufferMemoryBytes.addAndGet(after - before); + } + } finally { + bLock.unlock(); + } + + // Timer update and fixed delay detection + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + checkAndUpdateHighest(deliverAt); + return true; + } + + private void checkAndUpdateHighest(long deliverAt) { + long current; + do { + current = highestDeliveryTimeTracked.get(); + if (deliverAt < (current - tickTimeMillis)) { + messagesHaveFixedDelay = false; + } + } while (deliverAt > current && !highestDeliveryTimeTracked.compareAndSet(current, deliverAt)); + } + + /** + * Check if there are messages available for a subscription. + */ + boolean hasMessageAvailableForSub(SubContext subContext) { + if (delayedMessageMap.isEmpty()) { + return false; + } + // Use firstEntry() to avoid NoSuchElementException on concurrent empty map + Map.Entry<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> first = delayedMessageMap.firstEntry(); + if (first == null) { + return false; + } + long cutoffTime = subContext.getCutoffTime(); + long firstTs = first.getKey(); + return firstTs <= cutoffTime; + } + + /** + * Get scheduled messages for a subscription. + */ + NavigableSet<Position> getScheduledMessagesForSub(SubContext subContext, int maxMessages) { + NavigableSet<Position> positions = new TreeSet<>(); + int remaining = maxMessages; + + long cutoffTime = subContext.getCutoffTime(); + Position markDelete = subContext.getMarkDeletePosition(); + + // Snapshot of buckets up to cutoff and iterate per-bucket with bucket locks + List<Long> tsList = new ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet()); + for (Long ts : tsList) { + if (remaining <= 0) { + break; + } + ReentrantLock bLock = bucketLocks.get(ts); + if (bLock == null) { + continue; + } + bLock.lock(); + try { + Long2ObjectRBTreeMap<Roaring64Bitmap> ledgerMap = delayedMessageMap.get(ts); + if (ledgerMap == null) { + continue; + } + for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + if (remaining <= 0) { + break; + } + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + if (markDelete != null && ledgerId < markDelete.getLedgerId()) { + continue; + } + LongIterator it = entryIds.getLongIterator(); + while (it.hasNext() && remaining > 0) { + long entryId = it.next(); + if (markDelete != null && ledgerId == markDelete.getLedgerId() + && entryId <= markDelete.getEntryId()) { + continue; + } + positions.add(PositionFactory.create(ledgerId, entryId)); + remaining--; + } + } + } finally { + bLock.unlock(); + } + } + + // Throttled prune: attempt prune even when result is empty (mark-delete might have filtered everything) + // Throttling ensures we don't pay the cost on every call + maybePruneByTime(); + + return positions; + } + + /** + * Check if deliveries should be paused for a subscription. + */ + boolean shouldPauseAllDeliveriesForSub(SubContext subContext) { + // Parity with legacy: pause if all observed delays are fixed and backlog is large enough + return subContext.getFixedDelayDetectionLookahead() > 0 + && messagesHaveFixedDelay + && getNumberOfVisibleDelayedMessagesForSub(subContext) >= subContext.getFixedDelayDetectionLookahead() + && !hasMessageAvailableForSub(subContext); + } + + /** + * Clear delayed messages for a subscription (no-op for topic-level manager). + */ + void clearForSub() { + // No-op: we don't clear global index for individual subscriptions + } + + /** + * Update mark delete position for a subscription. + */ + void updateMarkDeletePosition(SubContext subContext, Position position) { + // Event-driven update from dispatcher; keep it lightweight (no prune here) + subContext.updateMarkDeletePosition(position); + } + + private void updateTimerLocked() { + // Use firstEntry() to avoid NoSuchElementException on concurrent empty map + Map.Entry<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> first = delayedMessageMap.firstEntry(); + if (first == null) { + if (timeout != null) { + currentTimeoutTarget = -1; + timeout.cancel(); + timeout = null; + } + return; + } + long nextDeliveryTime = first.getKey(); + long now = clock.millis(); + if (timeout != null && nextDeliveryTime == currentTimeoutTarget && currentTimeoutTarget >= now) { + return; + } + if (timeout != null) { + timeout.cancel(); + } + long delayMillis = nextDeliveryTime - now; + if (delayMillis < 0) { + // Bucket already in the past: schedule immediate to unblock readers + delayMillis = 0; + } + long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now; + long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis); + currentTimeoutTarget = nextDeliveryTime; + timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS); + } + + private long getNumberOfVisibleDelayedMessagesForSub(SubContext subContext) { + // Simplified implementation - returns total count + // Could be enhanced to count only messages visible to this subscription + return delayedMessagesCount.get(); + } + + private void pruneByMinMarkDelete() { + // Find the minimum mark delete position across all subscriptions. + // If any subscription hasn't established a mark-delete yet, skip pruning to preserve global visibility. + Position minMarkDelete = null; + for (SubContext subContext : subscriptionContexts.values()) { + Position markDelete = subContext.getMarkDeletePosition(); + if (markDelete == null) { + return; // at least one subscription without mark-delete -> no pruning + } + if (minMarkDelete == null || markDelete.compareTo(minMarkDelete) < 0) { + minMarkDelete = markDelete; + } + } + + // No idempotency set to clean (Option A): rely on per-bitmap removal below Review Comment: This comment is unclear and appears incomplete. It mentions 'Option A' without explaining what it refers to or what the alternatives are. Consider clarifying this comment or removing it if it's no longer relevant. ```suggestion ``` ########## pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java: ########## @@ -0,0 +1,757 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.mockito.stubbing.Answer; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class InMemoryTopicDeliveryTrackerTest { + + private static class TestEnv { + final Timer timer; + final NavigableMap<Long, TimerTask> tasks; + final Clock clock; + final AtomicLong time; + + TestEnv() { + this.tasks = new TreeMap<>(); + this.time = new AtomicLong(0L); + this.clock = mock(Clock.class); + when(clock.millis()).then((Answer<Long>) invocation -> time.get()); + + this.timer = mock(Timer.class); + when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> { + TimerTask task = invocation.getArgument(0, TimerTask.class); + long timeout = invocation.getArgument(1, Long.class); + TimeUnit unit = invocation.getArgument(2, TimeUnit.class); + long scheduleAt = time.get() + unit.toMillis(timeout); + tasks.put(scheduleAt, task); + Timeout t = mock(Timeout.class); + when(t.cancel()).then(i -> { + tasks.remove(scheduleAt, task); + return null; + }); + when(t.isCancelled()).thenReturn(false); + return t; + }); + } + } + + private static AbstractPersistentDispatcherMultipleConsumers newDispatcher(String subName, ManagedCursor cursor) { + AbstractPersistentDispatcherMultipleConsumers dispatcher = + mock(AbstractPersistentDispatcherMultipleConsumers.class); + Subscription subscription = mock(Subscription.class); + when(subscription.getName()).thenReturn(subName); + when(dispatcher.getSubscription()).thenReturn(subscription); + when(dispatcher.getCursor()).thenReturn(cursor); + return dispatcher; + } + + @Test + public void testSingleSubscriptionBasicFlow() throws Exception { + TestEnv env = new TestEnv(); + long tickMs = 100; + boolean strict = true; + long lookahead = 10; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, tickMs, env.clock, strict, lookahead); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + DelayedDeliveryTracker view = manager.createOrGetView(dispatcher); + + assertFalse(view.hasMessageAvailable()); + + // Add 3 messages in the future + env.time.set(1000); + assertTrue(view.addMessage(1, 1, 1200)); + assertTrue(view.addMessage(1, 2, 1300)); + assertTrue(view.addMessage(2, 1, 1400)); + + assertFalse(view.hasMessageAvailable()); + assertEquals(view.getNumberOfDelayedMessages(), 3); + + // Advance time so first 2 buckets are visible + env.time.set(1350); + assertTrue(view.hasMessageAvailable()); + NavigableSet<Position> scheduled = view.getScheduledMessages(10); + // Should include both positions from first 2 buckets + assertEquals(scheduled.size(), 2); + + // Global counter doesn't drop until mark-delete pruning + assertEquals(view.getNumberOfDelayedMessages(), 3); + + // Mark-delete beyond the scheduled positions and prune + ((InMemoryTopicDelayedDeliveryTrackerView) view) + .updateMarkDeletePosition(PositionFactory.create(1L, 2L)); + // Trigger pruning by another get + view.getScheduledMessages(10); + // Now only one entry should remain in global index (prune is throttled -> wait up to 2s) + long start = System.currentTimeMillis(); + while (view.getNumberOfDelayedMessages() != 1 && System.currentTimeMillis() - start < 2000) { + try { + Thread.sleep(30); + } catch (InterruptedException ignored) {} + view.getScheduledMessages(1); + } + assertEquals(view.getNumberOfDelayedMessages(), 1); + + // Cleanup + view.close(); + } + + @Test + public void testSharedIndexDedupAcrossSubscriptions() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("sub-a", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("sub-b", c2); + + DelayedDeliveryTracker v1 = manager.createOrGetView(d1); + DelayedDeliveryTracker v2 = manager.createOrGetView(d2); + + env.time.set(1000); + assertTrue(v1.addMessage(10, 20, 2000)); + // Add the same message from another subscription; should be de-duplicated in global index + assertTrue(v2.addMessage(10, 20, 2000)); + + assertEquals(v1.getNumberOfDelayedMessages(), 1); + assertEquals(v2.getNumberOfDelayedMessages(), 1); + + v1.close(); + v2.close(); + } + + @Test + public void testTimerRunTriggersOnlyAvailableSubscriptions() throws Exception { + TestEnv env = new TestEnv(); + long tickMs = 100; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, tickMs, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("sub-a", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("sub-b", c2); + DelayedDeliveryTracker v1 = manager.createOrGetView(d1); + DelayedDeliveryTracker v2 = manager.createOrGetView(d2); + + env.time.set(0); + // Add two buckets. Only sub-a will have messages available based on mark-delete + assertTrue(v1.addMessage(1, 1, 500)); + assertTrue(v2.addMessage(1, 2, 500)); + + // Before cutoff + assertFalse(v1.hasMessageAvailable()); + assertFalse(v2.hasMessageAvailable()); + + // Set time after cutoff and set sub-a mark-delete behind entries, sub-b beyond entries + env.time.set(600); + ((InMemoryTopicDelayedDeliveryTrackerView) v1) + .updateMarkDeletePosition(PositionFactory.create(0L, 0L)); // visible for sub-a + ((InMemoryTopicDelayedDeliveryTrackerView) v2) + .updateMarkDeletePosition(PositionFactory.create(1L, 5L)); // filtered for sub-b + + // Invoke manager timer task directly + manager.run(mock(Timeout.class)); + + // Only d1 should be triggered + verify(d1, times(1)).readMoreEntriesAsync(); + verify(d2, times(0)).readMoreEntriesAsync(); + + v1.close(); + v2.close(); + } + + @Test + public void testPauseWithFixedDelays() throws Exception { + TestEnv env = new TestEnv(); + long lookahead = 5; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 10, env.clock, true, lookahead); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + InMemoryTopicDelayedDeliveryTrackerView view = + (InMemoryTopicDelayedDeliveryTrackerView) manager.createOrGetView(dispatcher); + + // Add strictly increasing deliverAt times (fixed delay scenario) + env.time.set(0); + for (int i = 1; i <= lookahead; i++) { + assertTrue(view.addMessage(i, i, i * 100L)); + } + assertTrue(view.shouldPauseAllDeliveries()); + + // Move time forward to make messages available -> pause should be lifted + env.time.set(lookahead * 100 + 1); + assertFalse(view.shouldPauseAllDeliveries()); + + view.close(); + } + + @Test + public void testDynamicTickTimeUpdateAffectsCutoff() throws Exception { + TestEnv env = new TestEnv(); + // non-strict mode: cutoff = now + tick + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, env.clock, false, 0); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + DelayedDeliveryTracker view = manager.createOrGetView(dispatcher); + + env.time.set(1000); + // deliverAt within current tick window -> rejected + assertFalse(view.addMessage(1, 1, 1050)); // cutoff=1100 + assertEquals(view.getNumberOfDelayedMessages(), 0); + + // shrink tick: cutoff reduces -> same deliverAt becomes accepted + view.resetTickTime(10); + assertTrue(view.addMessage(1, 1, 1050)); // cutoff=1010 + assertEquals(view.getNumberOfDelayedMessages(), 1); + + view.close(); + } + + @Test + public void testMinMarkDeleteAcrossSubscriptions() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("sub-a", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("sub-b", c2); + InMemoryTopicDelayedDeliveryTrackerView v1 = + (InMemoryTopicDelayedDeliveryTrackerView) manager.createOrGetView(d1); + InMemoryTopicDelayedDeliveryTrackerView v2 = + (InMemoryTopicDelayedDeliveryTrackerView) manager.createOrGetView(d2); + + env.time.set(0); + assertTrue(v1.addMessage(1, 1, 100)); + assertTrue(v1.addMessage(1, 2, 100)); + assertTrue(v1.addMessage(2, 1, 100)); + assertEquals(v1.getNumberOfDelayedMessages(), 3); + + // c1 behind, c2 ahead (set via view so manager receives updates) + v1.updateMarkDeletePosition(PositionFactory.create(0L, 0L)); + v2.updateMarkDeletePosition(PositionFactory.create(10L, 10L)); + + env.time.set(200); + // Trigger v2 read + prune attempt; min mark-delete still from c1 => no prune + v2.getScheduledMessages(10); + assertEquals(v1.getNumberOfDelayedMessages(), 3); + + // Advance c1 mark-delete beyond (1,2) + v1.updateMarkDeletePosition(PositionFactory.create(1L, 2L)); + v1.getScheduledMessages(10); + long start2 = System.currentTimeMillis(); + while (v1.getNumberOfDelayedMessages() != 1 && System.currentTimeMillis() - start2 < 2000) { + try { + Thread.sleep(30); + } catch (InterruptedException ignored) { + + } + v1.getScheduledMessages(1); + } + assertEquals(v1.getNumberOfDelayedMessages(), 1); + + v1.close(); + v2.close(); + } + + @Test + public void testTimerSchedulingWindowAlignment() throws Exception { + TestEnv env = new TestEnv(); + long tickMs = 1000; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, tickMs, env.clock, true, 0); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + DelayedDeliveryTracker view = manager.createOrGetView(dispatcher); + + // Establish lastTickRun via a manual run at t=10000 + env.time.set(10000); + manager.run(mock(Timeout.class)); + + // Add with deliverAt=10001, but tick window alignment should schedule at >= 11000 + assertTrue(view.addMessage(1, 1, 10001)); + long scheduledAt = env.tasks.firstKey(); + assertTrue(scheduledAt >= 11000, "scheduledAt=" + scheduledAt); + + // If no recent tick run, deliverAt should determine + env.tasks.clear(); + env.time.set(20000); + // No run -> lastTickRun remains 10000; bucketStart(20005)=20000; schedule aligns to bucket start immediately + assertTrue(view.addMessage(1, 2, 20005)); + long scheduledAt2 = env.tasks.firstKey(); + assertEquals(scheduledAt2, 20000); + + view.close(); + } + + @Test + public void testBufferMemoryUsageAndCleanup() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("sub-a", c); + DelayedDeliveryTracker v = manager.createOrGetView(d); + + env.time.set(0); + assertTrue(v.addMessage(1, 1, 10)); + assertTrue(v.getBufferMemoryUsage() > 0); + + v.close(); + // After last subscription closes, manager should clear index and memory + assertEquals(manager.topicDelayedMessages(), 0); + assertEquals(manager.topicBufferMemoryBytes(), 0); + } + + @Test + public void testGetScheduledMessagesLimit() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub", cursor); + DelayedDeliveryTracker view = manager.createOrGetView(dispatcher); + + env.time.set(1000); + for (int i = 0; i < 10; i++) { + assertTrue(view.addMessage(1, i, 1001)); + } + env.time.set(2000); + NavigableSet<Position> positions = view.getScheduledMessages(3); + assertEquals(positions.size(), 3); + + Position prev = null; + for (Position p : positions) { + if (prev != null) { + assertTrue(prev.compareTo(p) < 0); + } + prev = p; + } + + view.close(); + } + + @Test + public void testHasMessageAvailableIgnoresMarkDelete() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, env.clock, true, 0); + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("s", cursor); + InMemoryTopicDelayedDeliveryTrackerView view = + (InMemoryTopicDelayedDeliveryTrackerView) manager.createOrGetView(dispatcher); + + env.time.set(900); + assertTrue(view.addMessage(1, 1, 1000)); + env.time.set(1000); + view.updateMarkDeletePosition(PositionFactory.create(1, 1)); + assertTrue(view.hasMessageAvailable()); + assertTrue(view.getScheduledMessages(10).isEmpty()); + + view.close(); + } + + @Test + public void testCrossBucketDuplicatesDedupOnRead() throws Exception { + TestEnv env = new TestEnv(); + long tick = 256; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, tick, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("s1", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("s2", c2); + DelayedDeliveryTracker v1 = manager.createOrGetView(d1); + DelayedDeliveryTracker v2 = manager.createOrGetView(d2); + + env.time.set(1000); + long deliverAt = 1023; + assertTrue(v1.addMessage(9, 9, deliverAt)); + long before = manager.topicBufferMemoryBytes(); + + v2.resetTickTime(32); + assertTrue(v2.addMessage(9, 9, deliverAt)); + + env.time.set(2000); + NavigableSet<Position> scheduled = v1.getScheduledMessages(10); + assertEquals(scheduled.size(), 1); + assertTrue(manager.topicDelayedMessages() >= 1); + assertTrue(manager.topicBufferMemoryBytes() > before); + + v1.close(); + v2.close(); + } + + @Test + public void testClearIsNoOp() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + DelayedDeliveryTracker v = manager.createOrGetView(d); + + env.time.set(0); + assertTrue(v.addMessage(1, 1, 10)); + long before = manager.topicDelayedMessages(); + v.clear().join(); + assertEquals(manager.topicDelayedMessages(), before); + v.close(); + } + + @Test + public void testMultiSubscriptionCloseDoesNotClear() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("s1", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("s2", c2); + DelayedDeliveryTracker v1 = manager.createOrGetView(d1); + DelayedDeliveryTracker v2 = manager.createOrGetView(d2); + + env.time.set(0); + assertTrue(v1.addMessage(1, 1, 10)); + assertTrue(manager.topicDelayedMessages() > 0); + + v1.close(); + assertTrue(manager.topicDelayedMessages() > 0); + // Move time forward so remaining view can read + env.time.set(20); + assertFalse(v2.getScheduledMessages(10).isEmpty()); + + v2.close(); + assertEquals(manager.topicDelayedMessages(), 0); + } + + @Test + public void testBoundaryInputsRejected() throws Exception { + TestEnv env = new TestEnv(); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + + InMemoryTopicDelayedDeliveryTrackerManager mStrict = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, env.clock, true, 0); + DelayedDeliveryTracker vStrict = mStrict.createOrGetView(d); + env.time.set(1000); + assertFalse(vStrict.addMessage(1, 1, -1)); + assertFalse(vStrict.addMessage(1, 2, 1000)); + vStrict.close(); + + InMemoryTopicDelayedDeliveryTrackerManager mNonStrict = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, env.clock, false, 0); + DelayedDeliveryTracker vNon = mNonStrict.createOrGetView(d); + env.time.set(1000); + assertFalse(vNon.addMessage(1, 3, 1100)); + vNon.close(); + } + + private static void expectIllegalState(Runnable r) { + try { + r.run(); + org.testng.Assert.fail("Expected IllegalStateException"); + } catch (IllegalStateException expected) { + // ok + } + } + + @Test + public void testClosedViewThrowsOnOperations() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + InMemoryTopicDelayedDeliveryTrackerView v = + (InMemoryTopicDelayedDeliveryTrackerView) manager.createOrGetView(d); + v.close(); + + expectIllegalState(() -> v.addMessage(1, 1, 10)); + expectIllegalState(v::hasMessageAvailable); + expectIllegalState(() -> v.getScheduledMessages(1)); + expectIllegalState(v::clear); + } + + @Test + public void testRescheduleOnEarlierDeliverAt() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + DelayedDeliveryTracker v = manager.createOrGetView(d); + + env.time.set(0); + assertTrue(v.addMessage(1, 1, 10)); + assertEquals(env.tasks.firstKey().longValue(), 10L); + + assertTrue(v.addMessage(1, 2, 5)); + assertEquals(env.tasks.size(), 1); + assertEquals(env.tasks.firstKey().longValue(), 5L); + + v.close(); + } + + @Test + public void testEmptyIndexCancelsTimerOnClose() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + DelayedDeliveryTracker v = manager.createOrGetView(d); + + env.time.set(0); + assertTrue(v.addMessage(1, 1, 1000)); + assertFalse(env.tasks.isEmpty()); + v.close(); + assertTrue(env.tasks.isEmpty()); + } + + @Test + public void testMemoryGrowthAndPruneShrink() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 10, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + InMemoryTopicDelayedDeliveryTrackerView v = + (InMemoryTopicDelayedDeliveryTrackerView) manager.createOrGetView(d); + + env.time.set(0); + for (int i = 0; i < 50; i++) { + assertTrue(v.addMessage(1, i, 100)); + } + long memBefore = manager.topicBufferMemoryBytes(); + assertTrue(memBefore > 0); + + env.time.set(200); + v.updateMarkDeletePosition(PositionFactory.create(1, 25)); + v.getScheduledMessages(100); + // Wait for prune-by-time throttling window and trigger reads to allow prune to occur + long memAfter = manager.topicBufferMemoryBytes(); + long startWall = System.currentTimeMillis(); + while (memAfter >= memBefore && System.currentTimeMillis() - startWall < 2000) { Review Comment: Another polling loop with Thread.sleep(). This pattern appears multiple times in the test file. Consider extracting a common utility method like 'waitForCondition(Supplier<Boolean> condition, Duration timeout)' to reduce code duplication and improve maintainability. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java: ########## @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.roaringbitmap.longlong.LongIterator; +import org.roaringbitmap.longlong.Roaring64Bitmap; + +/** + * In-memory implementation of topic-level delayed delivery tracker manager. + * This manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, significantly reducing memory usage in multi-subscription scenarios. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedDeliveryTrackerManager, TimerTask { + + // Global delayed message index: timestamp -> ledgerId -> entryId bitmap + // Outer: sorted by timestamp for efficient finding of earliest bucket + // Inner: per-ledger bitmaps of entry-ids + private final ConcurrentSkipListMap<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> delayedMessageMap = + new ConcurrentSkipListMap<>(); + + // Subscription registry: subscription name -> subscription context + private final ConcurrentHashMap<String, SubContext> subscriptionContexts = new ConcurrentHashMap<>(); + + // Timer for delayed delivery + private final Timer timer; + private Timeout timeout; + private long currentTimeoutTarget = -1; + // Last time the TimerTask was triggered + private long lastTickRun = 0L; + + // Configuration + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + + // Statistics + private final AtomicLong delayedMessagesCount = new AtomicLong(0); + private final AtomicLong bufferMemoryBytes = new AtomicLong(0); + + // Prune throttling + // Last pruning time + private final AtomicLong lastPruneNanos = new AtomicLong(0); + // Minimum interval between prunes + private final long minPruneIntervalNanos; + + // Fixed-delay detection (parity with legacy behavior) + private final AtomicLong highestDeliveryTimeTracked = new AtomicLong(0); + private volatile boolean messagesHaveFixedDelay = true; + + // Per-bucket locks (timestamp -> lock) for fine-grained concurrency + private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new ConcurrentHashMap<>(); + + // Timer state guard + private final ReentrantLock timerLock = new ReentrantLock(); + + /** + * Subscription context that holds per-subscription state. + */ + @Getter + static class SubContext { + private final AbstractPersistentDispatcherMultipleConsumers dispatcher; + private final String subscriptionName; + private volatile long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + private volatile Position markDeletePosition; + + SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead, + Clock clock) { + this.dispatcher = dispatcher; + this.subscriptionName = dispatcher.getSubscription().getName(); + this.tickTimeMillis = tickTimeMillis; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.clock = clock; + } + + void updateMarkDeletePosition(Position position) { + this.markDeletePosition = position; + } + + long getCutoffTime() { + long now = clock.millis(); + return isDelayedDeliveryDeliverAtTimeStrict ? now : now + tickTimeMillis; + } + } + + private final Runnable onEmptyCallback; + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead, + Runnable onEmptyCallback) { + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.clock = clock; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.onEmptyCallback = onEmptyCallback; + // Default prune throttle interval: clamp to [5ms, 50ms] using tickTimeMillis as hint + // TODO: make configurable if needed + long pruneMs = Math.max(5L, Math.min(50L, tickTimeMillis)); + this.minPruneIntervalNanos = TimeUnit.MILLISECONDS.toNanos(pruneMs); + } + + // We bucket messages by aligning the deliverAt timestamp to the start of the logical tick window: + // bucketStart = deliverAt - (deliverAt % tickTimeMillis) + // If tickTimeMillis changes over time, the same message may land in different buckets when re-added + // by another subscription. Read paths dedup via TreeSet and counts include duplicates by design. + private long bucketStart(long timestamp) { + long t = this.tickTimeMillis; + if (t <= 0) { + return timestamp; + } + long mod = timestamp % t; + if (mod == 0) { + return timestamp; + } + return timestamp - mod; + } + + @Override + public DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + SubContext subContext = subscriptionContexts.computeIfAbsent(subscriptionName, + k -> new SubContext(dispatcher, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, clock)); + return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext); + } + + @Override + public void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + subscriptionContexts.remove(subscriptionName); + // If no more subscriptions, proactively free index and release memory + if (subscriptionContexts.isEmpty()) { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + if (onEmptyCallback != null) { + try { + onEmptyCallback.run(); + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug("onEmptyCallback failed", t); + } + } + } + } + } + + @Override + public void onTickTimeUpdated(long newTickTimeMillis) { + if (this.tickTimeMillis == newTickTimeMillis) { + return; + } + this.tickTimeMillis = newTickTimeMillis; + // Propagate to all subscriptions + for (SubContext sc : subscriptionContexts.values()) { + sc.tickTimeMillis = newTickTimeMillis; + } + // Re-evaluate timer scheduling with new tick time + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + if (log.isDebugEnabled()) { + log.debug("Updated tickTimeMillis for topic-level delayed delivery manager to {} ms", newTickTimeMillis); + } + } + + @Override + public long topicBufferMemoryBytes() { + return bufferMemoryBytes.get(); + } + + @Override + public long topicDelayedMessages() { + return delayedMessagesCount.get(); + } + + @Override + public void close() { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + subscriptionContexts.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + } + + /** + * Add a message to the global delayed message index. + */ + boolean addMessageForSub(SubContext subContext, long ledgerId, long entryId, long deliverAt) { + if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) { + return false; + } + + long timestamp = bucketStart(deliverAt); + ReentrantLock bLock = bucketLocks.computeIfAbsent(timestamp, k -> new ReentrantLock()); + bLock.lock(); + try { + Long2ObjectRBTreeMap<Roaring64Bitmap> ledgerMap = + delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()); + Roaring64Bitmap entryIds = ledgerMap.get(ledgerId); + if (entryIds == null) { + entryIds = new Roaring64Bitmap(); + ledgerMap.put(ledgerId, entryIds); + } + long before = entryIds.getLongSizeInBytes(); + if (!entryIds.contains(entryId)) { + entryIds.add(entryId); + delayedMessagesCount.incrementAndGet(); + long after = entryIds.getLongSizeInBytes(); + bufferMemoryBytes.addAndGet(after - before); + } + } finally { + bLock.unlock(); + } + + // Timer update and fixed delay detection + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + checkAndUpdateHighest(deliverAt); + return true; + } + + private void checkAndUpdateHighest(long deliverAt) { + long current; + do { + current = highestDeliveryTimeTracked.get(); + if (deliverAt < (current - tickTimeMillis)) { + messagesHaveFixedDelay = false; + } + } while (deliverAt > current && !highestDeliveryTimeTracked.compareAndSet(current, deliverAt)); + } + + /** + * Check if there are messages available for a subscription. + */ + boolean hasMessageAvailableForSub(SubContext subContext) { + if (delayedMessageMap.isEmpty()) { + return false; + } + // Use firstEntry() to avoid NoSuchElementException on concurrent empty map + Map.Entry<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> first = delayedMessageMap.firstEntry(); + if (first == null) { + return false; + } + long cutoffTime = subContext.getCutoffTime(); + long firstTs = first.getKey(); + return firstTs <= cutoffTime; + } + + /** + * Get scheduled messages for a subscription. + */ + NavigableSet<Position> getScheduledMessagesForSub(SubContext subContext, int maxMessages) { + NavigableSet<Position> positions = new TreeSet<>(); + int remaining = maxMessages; + + long cutoffTime = subContext.getCutoffTime(); + Position markDelete = subContext.getMarkDeletePosition(); + + // Snapshot of buckets up to cutoff and iterate per-bucket with bucket locks + List<Long> tsList = new ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet()); + for (Long ts : tsList) { + if (remaining <= 0) { + break; + } + ReentrantLock bLock = bucketLocks.get(ts); + if (bLock == null) { + continue; + } + bLock.lock(); + try { + Long2ObjectRBTreeMap<Roaring64Bitmap> ledgerMap = delayedMessageMap.get(ts); + if (ledgerMap == null) { + continue; + } + for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + if (remaining <= 0) { + break; + } + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + if (markDelete != null && ledgerId < markDelete.getLedgerId()) { + continue; + } + LongIterator it = entryIds.getLongIterator(); + while (it.hasNext() && remaining > 0) { + long entryId = it.next(); + if (markDelete != null && ledgerId == markDelete.getLedgerId() + && entryId <= markDelete.getEntryId()) { + continue; + } + positions.add(PositionFactory.create(ledgerId, entryId)); + remaining--; + } + } + } finally { + bLock.unlock(); + } + } + + // Throttled prune: attempt prune even when result is empty (mark-delete might have filtered everything) + // Throttling ensures we don't pay the cost on every call + maybePruneByTime(); + + return positions; + } + + /** + * Check if deliveries should be paused for a subscription. + */ + boolean shouldPauseAllDeliveriesForSub(SubContext subContext) { + // Parity with legacy: pause if all observed delays are fixed and backlog is large enough + return subContext.getFixedDelayDetectionLookahead() > 0 + && messagesHaveFixedDelay + && getNumberOfVisibleDelayedMessagesForSub(subContext) >= subContext.getFixedDelayDetectionLookahead() + && !hasMessageAvailableForSub(subContext); + } + + /** + * Clear delayed messages for a subscription (no-op for topic-level manager). + */ + void clearForSub() { + // No-op: we don't clear global index for individual subscriptions + } + + /** + * Update mark delete position for a subscription. + */ + void updateMarkDeletePosition(SubContext subContext, Position position) { + // Event-driven update from dispatcher; keep it lightweight (no prune here) + subContext.updateMarkDeletePosition(position); + } + + private void updateTimerLocked() { + // Use firstEntry() to avoid NoSuchElementException on concurrent empty map + Map.Entry<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> first = delayedMessageMap.firstEntry(); + if (first == null) { + if (timeout != null) { + currentTimeoutTarget = -1; + timeout.cancel(); + timeout = null; + } + return; + } + long nextDeliveryTime = first.getKey(); + long now = clock.millis(); + if (timeout != null && nextDeliveryTime == currentTimeoutTarget && currentTimeoutTarget >= now) { + return; + } + if (timeout != null) { + timeout.cancel(); + } + long delayMillis = nextDeliveryTime - now; + if (delayMillis < 0) { + // Bucket already in the past: schedule immediate to unblock readers + delayMillis = 0; + } + long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now; + long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis); + currentTimeoutTarget = nextDeliveryTime; + timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS); + } + + private long getNumberOfVisibleDelayedMessagesForSub(SubContext subContext) { + // Simplified implementation - returns total count + // Could be enhanced to count only messages visible to this subscription + return delayedMessagesCount.get(); + } + + private void pruneByMinMarkDelete() { + // Find the minimum mark delete position across all subscriptions. + // If any subscription hasn't established a mark-delete yet, skip pruning to preserve global visibility. + Position minMarkDelete = null; + for (SubContext subContext : subscriptionContexts.values()) { + Position markDelete = subContext.getMarkDeletePosition(); + if (markDelete == null) { Review Comment: [nitpick] The comment could be clearer about the rationale for skipping pruning when any subscription lacks a mark-delete position. Consider expanding the comment to explain that this ensures all subscriptions can still read the delayed messages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
