Copilot commented on code in PR #24927:
URL: https://github.com/apache/pulsar/pull/24927#discussion_r2483706386


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.LongIterator;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that 
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription 
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements 
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+    // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+    // Outer: sorted by timestamp for efficient finding of earliest bucket
+    // Inner: per-ledger bitmaps of entry-ids
+    private final ConcurrentSkipListMap<Long, 
Long2ObjectRBTreeMap<Roaring64Bitmap>> delayedMessageMap =
+            new ConcurrentSkipListMap<>();
+
+    // Subscription registry: subscription name -> subscription context
+    private final ConcurrentHashMap<String, SubContext> subscriptionContexts = 
new ConcurrentHashMap<>();
+
+    // Timer for delayed delivery
+    private final Timer timer;
+    private Timeout timeout;
+    private long currentTimeoutTarget = -1;
+    // Last time the TimerTask was triggered
+    private long lastTickRun = 0L;
+
+    // Configuration
+    private long tickTimeMillis;
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final long fixedDelayDetectionLookahead;
+    private final Clock clock;
+
+    // Statistics
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+    private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+    // Prune throttling
+    // Last pruning time
+    private final AtomicLong lastPruneNanos = new AtomicLong(0);
+    // Minimum interval between prunes
+    private final long minPruneIntervalNanos;
+
+    // Fixed-delay detection (parity with legacy behavior)
+    private final AtomicLong highestDeliveryTimeTracked = new AtomicLong(0);
+    private volatile boolean messagesHaveFixedDelay = true;
+
+    // Per-bucket locks (timestamp -> lock) for fine-grained concurrency
+    private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new 
ConcurrentHashMap<>();
+
+    // Timer state guard
+    private final ReentrantLock timerLock = new ReentrantLock();
+
+    /**
+     * Subscription context that holds per-subscription state.
+     */
+    @Getter
+    static class SubContext {
+        private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+        private final String subscriptionName;
+        private volatile long tickTimeMillis;
+        private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+        private final long fixedDelayDetectionLookahead;
+        private final Clock clock;
+        private volatile Position markDeletePosition;
+
+        SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, 
long tickTimeMillis,
+                   boolean isDelayedDeliveryDeliverAtTimeStrict, long 
fixedDelayDetectionLookahead,
+                   Clock clock) {
+            this.dispatcher = dispatcher;
+            this.subscriptionName = dispatcher.getSubscription().getName();
+            this.tickTimeMillis = tickTimeMillis;
+            this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+            this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+            this.clock = clock;
+        }
+
+        void updateMarkDeletePosition(Position position) {
+            this.markDeletePosition = position;
+        }
+
+        long getCutoffTime() {
+            long now = clock.millis();
+            return isDelayedDeliveryDeliverAtTimeStrict ? now : now + 
tickTimeMillis;
+        }
+    }
+
+    private final Runnable onEmptyCallback;
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead,
+                                                      Runnable 
onEmptyCallback) {
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.onEmptyCallback = onEmptyCallback;
+        // Default prune throttle interval: clamp to [5ms, 50ms] using 
tickTimeMillis as hint
+        // TODO: make configurable if needed

Review Comment:
   [nitpick] The TODO comment suggests this may need to be configurable in the 
future. Consider whether this configuration should be added now if it's a 
critical performance parameter, or create a tracking issue for future work.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java:
##########
@@ -66,13 +97,35 @@ public DelayedDeliveryTracker 
newTracker(AbstractPersistentDispatcherMultipleCon
     }
 
     @VisibleForTesting
-    InMemoryDelayedDeliveryTracker 
newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
-        return new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
tickTimeMillis,
-                isDelayedDeliveryDeliverAtTimeStrict, 
fixedDelayDetectionLookahead);
+    DelayedDeliveryTracker 
newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String topicName = dispatcher.getTopic().getName();
+
+        // Get or create topic-level manager for this topic with onEmpty 
callback to remove from cache
+        final TopicDelayedDeliveryTrackerManager[] holder = new 
TopicDelayedDeliveryTrackerManager[1];
+        TopicDelayedDeliveryTrackerManager manager = 
topicManagers.computeIfAbsent(topicName, k -> {
+            InMemoryTopicDelayedDeliveryTrackerManager m = new 
InMemoryTopicDelayedDeliveryTrackerManager(
+                    timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
+                    fixedDelayDetectionLookahead, () -> 
topicManagers.remove(topicName, holder[0]));
+            holder[0] = m;
+            return m;
+        });

Review Comment:
   The pattern using 'holder[0]' to capture the newly created manager for the 
onEmpty callback is unnecessarily complex and error-prone. Consider computing 
the callback outside the lambda or storing the manager reference after creation 
and then registering the callback.
   ```suggestion
           TopicDelayedDeliveryTrackerManager manager = 
topicManagers.get(topicName);
           if (manager == null) {
               InMemoryTopicDelayedDeliveryTrackerManager newManager = new 
InMemoryTopicDelayedDeliveryTrackerManager(
                       timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
                       fixedDelayDetectionLookahead, () -> 
topicManagers.remove(topicName, newManager));
               TopicDelayedDeliveryTrackerManager existing = 
topicManagers.putIfAbsent(topicName, newManager);
               manager = (existing == null) ? newManager : existing;
           }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.LongIterator;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that 
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription 
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements 
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+    // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+    // Outer: sorted by timestamp for efficient finding of earliest bucket
+    // Inner: per-ledger bitmaps of entry-ids
+    private final ConcurrentSkipListMap<Long, 
Long2ObjectRBTreeMap<Roaring64Bitmap>> delayedMessageMap =
+            new ConcurrentSkipListMap<>();
+
+    // Subscription registry: subscription name -> subscription context
+    private final ConcurrentHashMap<String, SubContext> subscriptionContexts = 
new ConcurrentHashMap<>();
+
+    // Timer for delayed delivery
+    private final Timer timer;
+    private Timeout timeout;
+    private long currentTimeoutTarget = -1;
+    // Last time the TimerTask was triggered
+    private long lastTickRun = 0L;
+
+    // Configuration
+    private long tickTimeMillis;
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final long fixedDelayDetectionLookahead;
+    private final Clock clock;
+
+    // Statistics
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+    private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+    // Prune throttling
+    // Last pruning time
+    private final AtomicLong lastPruneNanos = new AtomicLong(0);
+    // Minimum interval between prunes
+    private final long minPruneIntervalNanos;
+
+    // Fixed-delay detection (parity with legacy behavior)
+    private final AtomicLong highestDeliveryTimeTracked = new AtomicLong(0);
+    private volatile boolean messagesHaveFixedDelay = true;
+
+    // Per-bucket locks (timestamp -> lock) for fine-grained concurrency
+    private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new 
ConcurrentHashMap<>();
+
+    // Timer state guard
+    private final ReentrantLock timerLock = new ReentrantLock();
+
+    /**
+     * Subscription context that holds per-subscription state.
+     */
+    @Getter
+    static class SubContext {
+        private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+        private final String subscriptionName;
+        private volatile long tickTimeMillis;
+        private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+        private final long fixedDelayDetectionLookahead;
+        private final Clock clock;
+        private volatile Position markDeletePosition;
+
+        SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, 
long tickTimeMillis,
+                   boolean isDelayedDeliveryDeliverAtTimeStrict, long 
fixedDelayDetectionLookahead,
+                   Clock clock) {
+            this.dispatcher = dispatcher;
+            this.subscriptionName = dispatcher.getSubscription().getName();
+            this.tickTimeMillis = tickTimeMillis;
+            this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+            this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+            this.clock = clock;
+        }
+
+        void updateMarkDeletePosition(Position position) {
+            this.markDeletePosition = position;
+        }
+
+        long getCutoffTime() {
+            long now = clock.millis();
+            return isDelayedDeliveryDeliverAtTimeStrict ? now : now + 
tickTimeMillis;
+        }
+    }
+
+    private final Runnable onEmptyCallback;
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead,
+                                                      Runnable 
onEmptyCallback) {
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.onEmptyCallback = onEmptyCallback;
+        // Default prune throttle interval: clamp to [5ms, 50ms] using 
tickTimeMillis as hint
+        // TODO: make configurable if needed
+        long pruneMs = Math.max(5L, Math.min(50L, tickTimeMillis));
+        this.minPruneIntervalNanos = TimeUnit.MILLISECONDS.toNanos(pruneMs);
+    }
+
+    // We bucket messages by aligning the deliverAt timestamp to the start of 
the logical tick window:
+    // bucketStart = deliverAt - (deliverAt % tickTimeMillis)
+    // If tickTimeMillis changes over time, the same message may land in 
different buckets when re-added
+    // by another subscription. Read paths dedup via TreeSet and counts 
include duplicates by design.
+    private long bucketStart(long timestamp) {
+        long t = this.tickTimeMillis;
+        if (t <= 0) {
+            return timestamp;
+        }
+        long mod = timestamp % t;
+        if (mod == 0) {
+            return timestamp;
+        }
+        return timestamp - mod;
+    }
+
+    @Override
+    public DelayedDeliveryTracker 
createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        SubContext subContext = 
subscriptionContexts.computeIfAbsent(subscriptionName,
+                k -> new SubContext(dispatcher, tickTimeMillis, 
isDelayedDeliveryDeliverAtTimeStrict,
+                        fixedDelayDetectionLookahead, clock));
+        return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext);
+    }
+
+    @Override
+    public void unregister(AbstractPersistentDispatcherMultipleConsumers 
dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        subscriptionContexts.remove(subscriptionName);
+        // If no more subscriptions, proactively free index and release memory
+        if (subscriptionContexts.isEmpty()) {
+            timerLock.lock();
+            try {
+                if (timeout != null) {
+                    timeout.cancel();
+                    timeout = null;
+                }
+                currentTimeoutTarget = -1;
+            } finally {
+                timerLock.unlock();
+            }
+            delayedMessageMap.clear();
+            bucketLocks.clear();
+            delayedMessagesCount.set(0);
+            bufferMemoryBytes.set(0);
+            if (onEmptyCallback != null) {
+                try {
+                    onEmptyCallback.run();
+                } catch (Throwable t) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("onEmptyCallback failed", t);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onTickTimeUpdated(long newTickTimeMillis) {
+        if (this.tickTimeMillis == newTickTimeMillis) {
+            return;
+        }
+        this.tickTimeMillis = newTickTimeMillis;
+        // Propagate to all subscriptions
+        for (SubContext sc : subscriptionContexts.values()) {
+            sc.tickTimeMillis = newTickTimeMillis;
+        }
+        // Re-evaluate timer scheduling with new tick time
+        timerLock.lock();
+        try {
+            updateTimerLocked();
+        } finally {
+            timerLock.unlock();
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Updated tickTimeMillis for topic-level delayed delivery 
manager to {} ms", newTickTimeMillis);
+        }
+    }
+
+    @Override
+    public long topicBufferMemoryBytes() {
+        return bufferMemoryBytes.get();
+    }
+
+    @Override
+    public long topicDelayedMessages() {
+        return delayedMessagesCount.get();
+    }
+
+    @Override
+    public void close() {
+        timerLock.lock();
+        try {
+            if (timeout != null) {
+                timeout.cancel();
+                timeout = null;
+            }
+            currentTimeoutTarget = -1;
+        } finally {
+            timerLock.unlock();
+        }
+        delayedMessageMap.clear();
+        bucketLocks.clear();
+        subscriptionContexts.clear();
+        delayedMessagesCount.set(0);
+        bufferMemoryBytes.set(0);
+    }
+
+    /**
+     * Add a message to the global delayed message index.
+     */
+    boolean addMessageForSub(SubContext subContext, long ledgerId, long 
entryId, long deliverAt) {
+        if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) {
+            return false;
+        }
+
+        long timestamp = bucketStart(deliverAt);
+        ReentrantLock bLock = bucketLocks.computeIfAbsent(timestamp, k -> new 
ReentrantLock());
+        bLock.lock();
+        try {
+            Long2ObjectRBTreeMap<Roaring64Bitmap> ledgerMap =
+                    delayedMessageMap.computeIfAbsent(timestamp, k -> new 
Long2ObjectRBTreeMap<>());
+            Roaring64Bitmap entryIds = ledgerMap.get(ledgerId);
+            if (entryIds == null) {
+                entryIds = new Roaring64Bitmap();
+                ledgerMap.put(ledgerId, entryIds);
+            }
+            long before = entryIds.getLongSizeInBytes();
+            if (!entryIds.contains(entryId)) {
+                entryIds.add(entryId);
+                delayedMessagesCount.incrementAndGet();
+                long after = entryIds.getLongSizeInBytes();
+                bufferMemoryBytes.addAndGet(after - before);
+            }
+        } finally {
+            bLock.unlock();
+        }
+
+        // Timer update and fixed delay detection
+        timerLock.lock();
+        try {
+            updateTimerLocked();
+        } finally {
+            timerLock.unlock();
+        }
+        checkAndUpdateHighest(deliverAt);
+        return true;
+    }
+
+    private void checkAndUpdateHighest(long deliverAt) {
+        long current;
+        do {
+            current = highestDeliveryTimeTracked.get();
+            if (deliverAt < (current - tickTimeMillis)) {
+                messagesHaveFixedDelay = false;
+            }
+        } while (deliverAt > current && 
!highestDeliveryTimeTracked.compareAndSet(current, deliverAt));
+    }
+
+    /**
+     * Check if there are messages available for a subscription.
+     */
+    boolean hasMessageAvailableForSub(SubContext subContext) {
+        if (delayedMessageMap.isEmpty()) {
+            return false;
+        }
+        // Use firstEntry() to avoid NoSuchElementException on concurrent 
empty map
+        Map.Entry<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> first = 
delayedMessageMap.firstEntry();
+        if (first == null) {
+            return false;
+        }
+        long cutoffTime = subContext.getCutoffTime();
+        long firstTs = first.getKey();
+        return firstTs <= cutoffTime;
+    }
+
+    /**
+     * Get scheduled messages for a subscription.
+     */
+    NavigableSet<Position> getScheduledMessagesForSub(SubContext subContext, 
int maxMessages) {
+        NavigableSet<Position> positions = new TreeSet<>();
+        int remaining = maxMessages;
+
+        long cutoffTime = subContext.getCutoffTime();
+        Position markDelete = subContext.getMarkDeletePosition();
+
+        // Snapshot of buckets up to cutoff and iterate per-bucket with bucket 
locks
+        List<Long> tsList = new 
ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet());
+        for (Long ts : tsList) {
+            if (remaining <= 0) {
+                break;
+            }
+            ReentrantLock bLock = bucketLocks.get(ts);
+            if (bLock == null) {
+                continue;
+            }
+            bLock.lock();
+            try {
+                Long2ObjectRBTreeMap<Roaring64Bitmap> ledgerMap = 
delayedMessageMap.get(ts);
+                if (ledgerMap == null) {
+                    continue;
+                }
+                for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : 
ledgerMap.long2ObjectEntrySet()) {
+                    if (remaining <= 0) {
+                        break;
+                    }
+                    long ledgerId = ledgerEntry.getLongKey();
+                    Roaring64Bitmap entryIds = ledgerEntry.getValue();
+                    if (markDelete != null && ledgerId < 
markDelete.getLedgerId()) {
+                        continue;
+                    }
+                    LongIterator it = entryIds.getLongIterator();
+                    while (it.hasNext() && remaining > 0) {
+                        long entryId = it.next();
+                        if (markDelete != null && ledgerId == 
markDelete.getLedgerId()
+                                && entryId <= markDelete.getEntryId()) {
+                            continue;
+                        }
+                        positions.add(PositionFactory.create(ledgerId, 
entryId));
+                        remaining--;
+                    }
+                }
+            } finally {
+                bLock.unlock();
+            }
+        }
+
+        // Throttled prune: attempt prune even when result is empty 
(mark-delete might have filtered everything)
+        // Throttling ensures we don't pay the cost on every call
+        maybePruneByTime();
+
+        return positions;
+    }
+
+    /**
+     * Check if deliveries should be paused for a subscription.
+     */
+    boolean shouldPauseAllDeliveriesForSub(SubContext subContext) {
+        // Parity with legacy: pause if all observed delays are fixed and 
backlog is large enough
+        return subContext.getFixedDelayDetectionLookahead() > 0
+                && messagesHaveFixedDelay
+                && getNumberOfVisibleDelayedMessagesForSub(subContext) >= 
subContext.getFixedDelayDetectionLookahead()
+                && !hasMessageAvailableForSub(subContext);
+    }
+
+    /**
+     * Clear delayed messages for a subscription (no-op for topic-level 
manager).
+     */
+    void clearForSub() {
+        // No-op: we don't clear global index for individual subscriptions
+    }
+
+    /**
+     * Update mark delete position for a subscription.
+     */
+    void updateMarkDeletePosition(SubContext subContext, Position position) {
+        // Event-driven update from dispatcher; keep it lightweight (no prune 
here)
+        subContext.updateMarkDeletePosition(position);
+    }
+
+    private void updateTimerLocked() {
+        // Use firstEntry() to avoid NoSuchElementException on concurrent 
empty map
+        Map.Entry<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> first = 
delayedMessageMap.firstEntry();
+        if (first == null) {
+            if (timeout != null) {
+                currentTimeoutTarget = -1;
+                timeout.cancel();
+                timeout = null;
+            }
+            return;
+        }
+        long nextDeliveryTime = first.getKey();
+        long now = clock.millis();
+        if (timeout != null && nextDeliveryTime == currentTimeoutTarget && 
currentTimeoutTarget >= now) {
+            return;
+        }
+        if (timeout != null) {
+            timeout.cancel();
+        }
+        long delayMillis = nextDeliveryTime - now;
+        if (delayMillis < 0) {
+            // Bucket already in the past: schedule immediate to unblock 
readers
+            delayMillis = 0;
+        }
+        long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+        long calculatedDelayMillis = Math.max(delayMillis, 
remainingTickDelayMillis);
+        currentTimeoutTarget = nextDeliveryTime;
+        timeout = timer.newTimeout(this, calculatedDelayMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    private long getNumberOfVisibleDelayedMessagesForSub(SubContext 
subContext) {
+        // Simplified implementation - returns total count
+        // Could be enhanced to count only messages visible to this 
subscription
+        return delayedMessagesCount.get();
+    }
+
+    private void pruneByMinMarkDelete() {
+        // Find the minimum mark delete position across all subscriptions.
+        // If any subscription hasn't established a mark-delete yet, skip 
pruning to preserve global visibility.
+        Position minMarkDelete = null;
+        for (SubContext subContext : subscriptionContexts.values()) {
+            Position markDelete = subContext.getMarkDeletePosition();
+            if (markDelete == null) {
+                return; // at least one subscription without mark-delete -> no 
pruning
+            }
+            if (minMarkDelete == null || markDelete.compareTo(minMarkDelete) < 
0) {
+                minMarkDelete = markDelete;
+            }
+        }
+
+        // No idempotency set to clean (Option A): rely on per-bitmap removal 
below

Review Comment:
   This comment is unclear and appears incomplete. It mentions 'Option A' 
without explaining what it refers to or what the alternatives are. Consider 
clarifying this comment or removing it if it's no longer relevant.
   ```suggestion
   
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java:
##########
@@ -0,0 +1,757 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.service.Subscription;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class InMemoryTopicDeliveryTrackerTest {
+
+    private static class TestEnv {
+        final Timer timer;
+        final NavigableMap<Long, TimerTask> tasks;
+        final Clock clock;
+        final AtomicLong time;
+
+        TestEnv() {
+            this.tasks = new TreeMap<>();
+            this.time = new AtomicLong(0L);
+            this.clock = mock(Clock.class);
+            when(clock.millis()).then((Answer<Long>) invocation -> time.get());
+
+            this.timer = mock(Timer.class);
+            when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> 
{
+                TimerTask task = invocation.getArgument(0, TimerTask.class);
+                long timeout = invocation.getArgument(1, Long.class);
+                TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
+                long scheduleAt = time.get() + unit.toMillis(timeout);
+                tasks.put(scheduleAt, task);
+                Timeout t = mock(Timeout.class);
+                when(t.cancel()).then(i -> {
+                    tasks.remove(scheduleAt, task);
+                    return null;
+                });
+                when(t.isCancelled()).thenReturn(false);
+                return t;
+            });
+        }
+    }
+
+    private static AbstractPersistentDispatcherMultipleConsumers 
newDispatcher(String subName, ManagedCursor cursor) {
+        AbstractPersistentDispatcherMultipleConsumers dispatcher =
+                mock(AbstractPersistentDispatcherMultipleConsumers.class);
+        Subscription subscription = mock(Subscription.class);
+        when(subscription.getName()).thenReturn(subName);
+        when(dispatcher.getSubscription()).thenReturn(subscription);
+        when(dispatcher.getCursor()).thenReturn(cursor);
+        return dispatcher;
+    }
+
+    @Test
+    public void testSingleSubscriptionBasicFlow() throws Exception {
+        TestEnv env = new TestEnv();
+        long tickMs = 100;
+        boolean strict = true;
+        long lookahead = 10;
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 
tickMs, env.clock, strict, lookahead);
+
+        ManagedCursor cursor = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers dispatcher = 
newDispatcher("sub-a", cursor);
+        DelayedDeliveryTracker view = manager.createOrGetView(dispatcher);
+
+        assertFalse(view.hasMessageAvailable());
+
+        // Add 3 messages in the future
+        env.time.set(1000);
+        assertTrue(view.addMessage(1, 1, 1200));
+        assertTrue(view.addMessage(1, 2, 1300));
+        assertTrue(view.addMessage(2, 1, 1400));
+
+        assertFalse(view.hasMessageAvailable());
+        assertEquals(view.getNumberOfDelayedMessages(), 3);
+
+        // Advance time so first 2 buckets are visible
+        env.time.set(1350);
+        assertTrue(view.hasMessageAvailable());
+        NavigableSet<Position> scheduled = view.getScheduledMessages(10);
+        // Should include both positions from first 2 buckets
+        assertEquals(scheduled.size(), 2);
+
+        // Global counter doesn't drop until mark-delete pruning
+        assertEquals(view.getNumberOfDelayedMessages(), 3);
+
+        // Mark-delete beyond the scheduled positions and prune
+        ((InMemoryTopicDelayedDeliveryTrackerView) view)
+                .updateMarkDeletePosition(PositionFactory.create(1L, 2L));
+        // Trigger pruning by another get
+        view.getScheduledMessages(10);
+        // Now only one entry should remain in global index (prune is 
throttled -> wait up to 2s)
+        long start = System.currentTimeMillis();
+        while (view.getNumberOfDelayedMessages() != 1 && 
System.currentTimeMillis() - start < 2000) {
+            try {
+                Thread.sleep(30);
+            } catch (InterruptedException ignored) {}
+            view.getScheduledMessages(1);
+        }
+        assertEquals(view.getNumberOfDelayedMessages(), 1);
+
+        // Cleanup
+        view.close();
+    }
+
+    @Test
+    public void testSharedIndexDedupAcrossSubscriptions() throws Exception {
+        TestEnv env = new TestEnv();
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, 
env.clock, true, 0);
+
+        ManagedCursor c1 = mock(ManagedCursor.class);
+        ManagedCursor c2 = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d1 = 
newDispatcher("sub-a", c1);
+        AbstractPersistentDispatcherMultipleConsumers d2 = 
newDispatcher("sub-b", c2);
+
+        DelayedDeliveryTracker v1 = manager.createOrGetView(d1);
+        DelayedDeliveryTracker v2 = manager.createOrGetView(d2);
+
+        env.time.set(1000);
+        assertTrue(v1.addMessage(10, 20, 2000));
+        // Add the same message from another subscription; should be 
de-duplicated in global index
+        assertTrue(v2.addMessage(10, 20, 2000));
+
+        assertEquals(v1.getNumberOfDelayedMessages(), 1);
+        assertEquals(v2.getNumberOfDelayedMessages(), 1);
+
+        v1.close();
+        v2.close();
+    }
+
+    @Test
+    public void testTimerRunTriggersOnlyAvailableSubscriptions() throws 
Exception {
+        TestEnv env = new TestEnv();
+        long tickMs = 100;
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 
tickMs, env.clock, true, 0);
+
+        ManagedCursor c1 = mock(ManagedCursor.class);
+        ManagedCursor c2 = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d1 = 
newDispatcher("sub-a", c1);
+        AbstractPersistentDispatcherMultipleConsumers d2 = 
newDispatcher("sub-b", c2);
+        DelayedDeliveryTracker v1 = manager.createOrGetView(d1);
+        DelayedDeliveryTracker v2 = manager.createOrGetView(d2);
+
+        env.time.set(0);
+        // Add two buckets. Only sub-a will have messages available based on 
mark-delete
+        assertTrue(v1.addMessage(1, 1, 500));
+        assertTrue(v2.addMessage(1, 2, 500));
+
+        // Before cutoff
+        assertFalse(v1.hasMessageAvailable());
+        assertFalse(v2.hasMessageAvailable());
+
+        // Set time after cutoff and set sub-a mark-delete behind entries, 
sub-b beyond entries
+        env.time.set(600);
+        ((InMemoryTopicDelayedDeliveryTrackerView) v1)
+                .updateMarkDeletePosition(PositionFactory.create(0L, 0L)); // 
visible for sub-a
+        ((InMemoryTopicDelayedDeliveryTrackerView) v2)
+                .updateMarkDeletePosition(PositionFactory.create(1L, 5L)); // 
filtered for sub-b
+
+        // Invoke manager timer task directly
+        manager.run(mock(Timeout.class));
+
+        // Only d1 should be triggered
+        verify(d1, times(1)).readMoreEntriesAsync();
+        verify(d2, times(0)).readMoreEntriesAsync();
+
+        v1.close();
+        v2.close();
+    }
+
+    @Test
+    public void testPauseWithFixedDelays() throws Exception {
+        TestEnv env = new TestEnv();
+        long lookahead = 5;
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 10, 
env.clock, true, lookahead);
+
+        ManagedCursor cursor = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers dispatcher = 
newDispatcher("sub-a", cursor);
+        InMemoryTopicDelayedDeliveryTrackerView view =
+                (InMemoryTopicDelayedDeliveryTrackerView) 
manager.createOrGetView(dispatcher);
+
+        // Add strictly increasing deliverAt times (fixed delay scenario)
+        env.time.set(0);
+        for (int i = 1; i <= lookahead; i++) {
+            assertTrue(view.addMessage(i, i, i * 100L));
+        }
+        assertTrue(view.shouldPauseAllDeliveries());
+
+        // Move time forward to make messages available -> pause should be 
lifted
+        env.time.set(lookahead * 100 + 1);
+        assertFalse(view.shouldPauseAllDeliveries());
+
+        view.close();
+    }
+
+    @Test
+    public void testDynamicTickTimeUpdateAffectsCutoff() throws Exception {
+        TestEnv env = new TestEnv();
+        // non-strict mode: cutoff = now + tick
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, 
env.clock, false, 0);
+
+        ManagedCursor cursor = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers dispatcher = 
newDispatcher("sub-a", cursor);
+        DelayedDeliveryTracker view = manager.createOrGetView(dispatcher);
+
+        env.time.set(1000);
+        // deliverAt within current tick window -> rejected
+        assertFalse(view.addMessage(1, 1, 1050)); // cutoff=1100
+        assertEquals(view.getNumberOfDelayedMessages(), 0);
+
+        // shrink tick: cutoff reduces -> same deliverAt becomes accepted
+        view.resetTickTime(10);
+        assertTrue(view.addMessage(1, 1, 1050)); // cutoff=1010
+        assertEquals(view.getNumberOfDelayedMessages(), 1);
+
+        view.close();
+    }
+
+    @Test
+    public void testMinMarkDeleteAcrossSubscriptions() throws Exception {
+        TestEnv env = new TestEnv();
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, 
env.clock, true, 0);
+
+        ManagedCursor c1 = mock(ManagedCursor.class);
+        ManagedCursor c2 = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d1 = 
newDispatcher("sub-a", c1);
+        AbstractPersistentDispatcherMultipleConsumers d2 = 
newDispatcher("sub-b", c2);
+        InMemoryTopicDelayedDeliveryTrackerView v1 =
+                (InMemoryTopicDelayedDeliveryTrackerView) 
manager.createOrGetView(d1);
+        InMemoryTopicDelayedDeliveryTrackerView v2 =
+                (InMemoryTopicDelayedDeliveryTrackerView) 
manager.createOrGetView(d2);
+
+        env.time.set(0);
+        assertTrue(v1.addMessage(1, 1, 100));
+        assertTrue(v1.addMessage(1, 2, 100));
+        assertTrue(v1.addMessage(2, 1, 100));
+        assertEquals(v1.getNumberOfDelayedMessages(), 3);
+
+        // c1 behind, c2 ahead (set via view so manager receives updates)
+        v1.updateMarkDeletePosition(PositionFactory.create(0L, 0L));
+        v2.updateMarkDeletePosition(PositionFactory.create(10L, 10L));
+
+        env.time.set(200);
+        // Trigger v2 read + prune attempt; min mark-delete still from c1 => 
no prune
+        v2.getScheduledMessages(10);
+        assertEquals(v1.getNumberOfDelayedMessages(), 3);
+
+        // Advance c1 mark-delete beyond (1,2)
+        v1.updateMarkDeletePosition(PositionFactory.create(1L, 2L));
+        v1.getScheduledMessages(10);
+        long start2 = System.currentTimeMillis();
+        while (v1.getNumberOfDelayedMessages() != 1 && 
System.currentTimeMillis() - start2 < 2000) {
+            try {
+                Thread.sleep(30);
+            } catch (InterruptedException ignored) {
+
+            }
+            v1.getScheduledMessages(1);
+        }
+        assertEquals(v1.getNumberOfDelayedMessages(), 1);
+
+        v1.close();
+        v2.close();
+    }
+
+    @Test
+    public void testTimerSchedulingWindowAlignment() throws Exception {
+        TestEnv env = new TestEnv();
+        long tickMs = 1000;
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 
tickMs, env.clock, true, 0);
+
+        ManagedCursor cursor = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers dispatcher = 
newDispatcher("sub-a", cursor);
+        DelayedDeliveryTracker view = manager.createOrGetView(dispatcher);
+
+        // Establish lastTickRun via a manual run at t=10000
+        env.time.set(10000);
+        manager.run(mock(Timeout.class));
+
+        // Add with deliverAt=10001, but tick window alignment should schedule 
at >= 11000
+        assertTrue(view.addMessage(1, 1, 10001));
+        long scheduledAt = env.tasks.firstKey();
+        assertTrue(scheduledAt >= 11000, "scheduledAt=" + scheduledAt);
+
+        // If no recent tick run, deliverAt should determine
+        env.tasks.clear();
+        env.time.set(20000);
+        // No run -> lastTickRun remains 10000; bucketStart(20005)=20000; 
schedule aligns to bucket start immediately
+        assertTrue(view.addMessage(1, 2, 20005));
+        long scheduledAt2 = env.tasks.firstKey();
+        assertEquals(scheduledAt2, 20000);
+
+        view.close();
+    }
+
+    @Test
+    public void testBufferMemoryUsageAndCleanup() throws Exception {
+        TestEnv env = new TestEnv();
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, 
env.clock, true, 0);
+
+        ManagedCursor c = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d = 
newDispatcher("sub-a", c);
+        DelayedDeliveryTracker v = manager.createOrGetView(d);
+
+        env.time.set(0);
+        assertTrue(v.addMessage(1, 1, 10));
+        assertTrue(v.getBufferMemoryUsage() > 0);
+
+        v.close();
+        // After last subscription closes, manager should clear index and 
memory
+        assertEquals(manager.topicDelayedMessages(), 0);
+        assertEquals(manager.topicBufferMemoryBytes(), 0);
+    }
+
+    @Test
+    public void testGetScheduledMessagesLimit() throws Exception {
+        TestEnv env = new TestEnv();
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, 
env.clock, true, 0);
+        ManagedCursor cursor = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers dispatcher = 
newDispatcher("sub", cursor);
+        DelayedDeliveryTracker view = manager.createOrGetView(dispatcher);
+
+        env.time.set(1000);
+        for (int i = 0; i < 10; i++) {
+            assertTrue(view.addMessage(1, i, 1001));
+        }
+        env.time.set(2000);
+        NavigableSet<Position> positions = view.getScheduledMessages(3);
+        assertEquals(positions.size(), 3);
+
+        Position prev = null;
+        for (Position p : positions) {
+            if (prev != null) {
+                assertTrue(prev.compareTo(p) < 0);
+            }
+            prev = p;
+        }
+
+        view.close();
+    }
+
+    @Test
+    public void testHasMessageAvailableIgnoresMarkDelete() throws Exception {
+        TestEnv env = new TestEnv();
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, 
env.clock, true, 0);
+        ManagedCursor cursor = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers dispatcher = 
newDispatcher("s", cursor);
+        InMemoryTopicDelayedDeliveryTrackerView view =
+                (InMemoryTopicDelayedDeliveryTrackerView) 
manager.createOrGetView(dispatcher);
+
+        env.time.set(900);
+        assertTrue(view.addMessage(1, 1, 1000));
+        env.time.set(1000);
+        view.updateMarkDeletePosition(PositionFactory.create(1, 1));
+        assertTrue(view.hasMessageAvailable());
+        assertTrue(view.getScheduledMessages(10).isEmpty());
+
+        view.close();
+    }
+
+    @Test
+    public void testCrossBucketDuplicatesDedupOnRead() throws Exception {
+        TestEnv env = new TestEnv();
+        long tick = 256;
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 
tick, env.clock, true, 0);
+
+        ManagedCursor c1 = mock(ManagedCursor.class);
+        ManagedCursor c2 = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("s1", 
c1);
+        AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("s2", 
c2);
+        DelayedDeliveryTracker v1 = manager.createOrGetView(d1);
+        DelayedDeliveryTracker v2 = manager.createOrGetView(d2);
+
+        env.time.set(1000);
+        long deliverAt = 1023;
+        assertTrue(v1.addMessage(9, 9, deliverAt));
+        long before = manager.topicBufferMemoryBytes();
+
+        v2.resetTickTime(32);
+        assertTrue(v2.addMessage(9, 9, deliverAt));
+
+        env.time.set(2000);
+        NavigableSet<Position> scheduled = v1.getScheduledMessages(10);
+        assertEquals(scheduled.size(), 1);
+        assertTrue(manager.topicDelayedMessages() >= 1);
+        assertTrue(manager.topicBufferMemoryBytes() > before);
+
+        v1.close();
+        v2.close();
+    }
+
+    @Test
+    public void testClearIsNoOp() throws Exception {
+        TestEnv env = new TestEnv();
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, 
env.clock, true, 0);
+        ManagedCursor c = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", 
c);
+        DelayedDeliveryTracker v = manager.createOrGetView(d);
+
+        env.time.set(0);
+        assertTrue(v.addMessage(1, 1, 10));
+        long before = manager.topicDelayedMessages();
+        v.clear().join();
+        assertEquals(manager.topicDelayedMessages(), before);
+        v.close();
+    }
+
+    @Test
+    public void testMultiSubscriptionCloseDoesNotClear() throws Exception {
+        TestEnv env = new TestEnv();
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, 
env.clock, true, 0);
+
+        ManagedCursor c1 = mock(ManagedCursor.class);
+        ManagedCursor c2 = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("s1", 
c1);
+        AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("s2", 
c2);
+        DelayedDeliveryTracker v1 = manager.createOrGetView(d1);
+        DelayedDeliveryTracker v2 = manager.createOrGetView(d2);
+
+        env.time.set(0);
+        assertTrue(v1.addMessage(1, 1, 10));
+        assertTrue(manager.topicDelayedMessages() > 0);
+
+        v1.close();
+        assertTrue(manager.topicDelayedMessages() > 0);
+        // Move time forward so remaining view can read
+        env.time.set(20);
+        assertFalse(v2.getScheduledMessages(10).isEmpty());
+
+        v2.close();
+        assertEquals(manager.topicDelayedMessages(), 0);
+    }
+
+    @Test
+    public void testBoundaryInputsRejected() throws Exception {
+        TestEnv env = new TestEnv();
+        ManagedCursor c = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", 
c);
+
+        InMemoryTopicDelayedDeliveryTrackerManager mStrict =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, 
env.clock, true, 0);
+        DelayedDeliveryTracker vStrict = mStrict.createOrGetView(d);
+        env.time.set(1000);
+        assertFalse(vStrict.addMessage(1, 1, -1));
+        assertFalse(vStrict.addMessage(1, 2, 1000));
+        vStrict.close();
+
+        InMemoryTopicDelayedDeliveryTrackerManager mNonStrict =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, 
env.clock, false, 0);
+        DelayedDeliveryTracker vNon = mNonStrict.createOrGetView(d);
+        env.time.set(1000);
+        assertFalse(vNon.addMessage(1, 3, 1100));
+        vNon.close();
+    }
+
+    private static void expectIllegalState(Runnable r) {
+        try {
+            r.run();
+            org.testng.Assert.fail("Expected IllegalStateException");
+        } catch (IllegalStateException expected) {
+            // ok
+        }
+    }
+
+    @Test
+    public void testClosedViewThrowsOnOperations() throws Exception {
+        TestEnv env = new TestEnv();
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, 
env.clock, true, 0);
+        ManagedCursor c = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", 
c);
+        InMemoryTopicDelayedDeliveryTrackerView v =
+                (InMemoryTopicDelayedDeliveryTrackerView) 
manager.createOrGetView(d);
+        v.close();
+
+        expectIllegalState(() -> v.addMessage(1, 1, 10));
+        expectIllegalState(v::hasMessageAvailable);
+        expectIllegalState(() -> v.getScheduledMessages(1));
+        expectIllegalState(v::clear);
+    }
+
+    @Test
+    public void testRescheduleOnEarlierDeliverAt() throws Exception {
+        TestEnv env = new TestEnv();
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, 
env.clock, true, 0);
+        ManagedCursor c = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", 
c);
+        DelayedDeliveryTracker v = manager.createOrGetView(d);
+
+        env.time.set(0);
+        assertTrue(v.addMessage(1, 1, 10));
+        assertEquals(env.tasks.firstKey().longValue(), 10L);
+
+        assertTrue(v.addMessage(1, 2, 5));
+        assertEquals(env.tasks.size(), 1);
+        assertEquals(env.tasks.firstKey().longValue(), 5L);
+
+        v.close();
+    }
+
+    @Test
+    public void testEmptyIndexCancelsTimerOnClose() throws Exception {
+        TestEnv env = new TestEnv();
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, 
env.clock, true, 0);
+        ManagedCursor c = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", 
c);
+        DelayedDeliveryTracker v = manager.createOrGetView(d);
+
+        env.time.set(0);
+        assertTrue(v.addMessage(1, 1, 1000));
+        assertFalse(env.tasks.isEmpty());
+        v.close();
+        assertTrue(env.tasks.isEmpty());
+    }
+
+    @Test
+    public void testMemoryGrowthAndPruneShrink() throws Exception {
+        TestEnv env = new TestEnv();
+        InMemoryTopicDelayedDeliveryTrackerManager manager =
+                new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 10, 
env.clock, true, 0);
+        ManagedCursor c = mock(ManagedCursor.class);
+        AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", 
c);
+        InMemoryTopicDelayedDeliveryTrackerView v =
+                (InMemoryTopicDelayedDeliveryTrackerView) 
manager.createOrGetView(d);
+
+        env.time.set(0);
+        for (int i = 0; i < 50; i++) {
+            assertTrue(v.addMessage(1, i, 100));
+        }
+        long memBefore = manager.topicBufferMemoryBytes();
+        assertTrue(memBefore > 0);
+
+        env.time.set(200);
+        v.updateMarkDeletePosition(PositionFactory.create(1, 25));
+        v.getScheduledMessages(100);
+        // Wait for prune-by-time throttling window and trigger reads to allow 
prune to occur
+        long memAfter = manager.topicBufferMemoryBytes();
+        long startWall = System.currentTimeMillis();
+        while (memAfter >= memBefore && System.currentTimeMillis() - startWall 
< 2000) {

Review Comment:
   Another polling loop with Thread.sleep(). This pattern appears multiple 
times in the test file. Consider extracting a common utility method like 
'waitForCondition(Supplier<Boolean> condition, Duration timeout)' to reduce 
code duplication and improve maintainability.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.LongIterator;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that 
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription 
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements 
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+    // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+    // Outer: sorted by timestamp for efficient finding of earliest bucket
+    // Inner: per-ledger bitmaps of entry-ids
+    private final ConcurrentSkipListMap<Long, 
Long2ObjectRBTreeMap<Roaring64Bitmap>> delayedMessageMap =
+            new ConcurrentSkipListMap<>();
+
+    // Subscription registry: subscription name -> subscription context
+    private final ConcurrentHashMap<String, SubContext> subscriptionContexts = 
new ConcurrentHashMap<>();
+
+    // Timer for delayed delivery
+    private final Timer timer;
+    private Timeout timeout;
+    private long currentTimeoutTarget = -1;
+    // Last time the TimerTask was triggered
+    private long lastTickRun = 0L;
+
+    // Configuration
+    private long tickTimeMillis;
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final long fixedDelayDetectionLookahead;
+    private final Clock clock;
+
+    // Statistics
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+    private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+    // Prune throttling
+    // Last pruning time
+    private final AtomicLong lastPruneNanos = new AtomicLong(0);
+    // Minimum interval between prunes
+    private final long minPruneIntervalNanos;
+
+    // Fixed-delay detection (parity with legacy behavior)
+    private final AtomicLong highestDeliveryTimeTracked = new AtomicLong(0);
+    private volatile boolean messagesHaveFixedDelay = true;
+
+    // Per-bucket locks (timestamp -> lock) for fine-grained concurrency
+    private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new 
ConcurrentHashMap<>();
+
+    // Timer state guard
+    private final ReentrantLock timerLock = new ReentrantLock();
+
+    /**
+     * Subscription context that holds per-subscription state.
+     */
+    @Getter
+    static class SubContext {
+        private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+        private final String subscriptionName;
+        private volatile long tickTimeMillis;
+        private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+        private final long fixedDelayDetectionLookahead;
+        private final Clock clock;
+        private volatile Position markDeletePosition;
+
+        SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, 
long tickTimeMillis,
+                   boolean isDelayedDeliveryDeliverAtTimeStrict, long 
fixedDelayDetectionLookahead,
+                   Clock clock) {
+            this.dispatcher = dispatcher;
+            this.subscriptionName = dispatcher.getSubscription().getName();
+            this.tickTimeMillis = tickTimeMillis;
+            this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+            this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+            this.clock = clock;
+        }
+
+        void updateMarkDeletePosition(Position position) {
+            this.markDeletePosition = position;
+        }
+
+        long getCutoffTime() {
+            long now = clock.millis();
+            return isDelayedDeliveryDeliverAtTimeStrict ? now : now + 
tickTimeMillis;
+        }
+    }
+
+    private final Runnable onEmptyCallback;
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead,
+                                                      Runnable 
onEmptyCallback) {
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.onEmptyCallback = onEmptyCallback;
+        // Default prune throttle interval: clamp to [5ms, 50ms] using 
tickTimeMillis as hint
+        // TODO: make configurable if needed
+        long pruneMs = Math.max(5L, Math.min(50L, tickTimeMillis));
+        this.minPruneIntervalNanos = TimeUnit.MILLISECONDS.toNanos(pruneMs);
+    }
+
+    // We bucket messages by aligning the deliverAt timestamp to the start of 
the logical tick window:
+    // bucketStart = deliverAt - (deliverAt % tickTimeMillis)
+    // If tickTimeMillis changes over time, the same message may land in 
different buckets when re-added
+    // by another subscription. Read paths dedup via TreeSet and counts 
include duplicates by design.
+    private long bucketStart(long timestamp) {
+        long t = this.tickTimeMillis;
+        if (t <= 0) {
+            return timestamp;
+        }
+        long mod = timestamp % t;
+        if (mod == 0) {
+            return timestamp;
+        }
+        return timestamp - mod;
+    }
+
+    @Override
+    public DelayedDeliveryTracker 
createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        SubContext subContext = 
subscriptionContexts.computeIfAbsent(subscriptionName,
+                k -> new SubContext(dispatcher, tickTimeMillis, 
isDelayedDeliveryDeliverAtTimeStrict,
+                        fixedDelayDetectionLookahead, clock));
+        return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext);
+    }
+
+    @Override
+    public void unregister(AbstractPersistentDispatcherMultipleConsumers 
dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        subscriptionContexts.remove(subscriptionName);
+        // If no more subscriptions, proactively free index and release memory
+        if (subscriptionContexts.isEmpty()) {
+            timerLock.lock();
+            try {
+                if (timeout != null) {
+                    timeout.cancel();
+                    timeout = null;
+                }
+                currentTimeoutTarget = -1;
+            } finally {
+                timerLock.unlock();
+            }
+            delayedMessageMap.clear();
+            bucketLocks.clear();
+            delayedMessagesCount.set(0);
+            bufferMemoryBytes.set(0);
+            if (onEmptyCallback != null) {
+                try {
+                    onEmptyCallback.run();
+                } catch (Throwable t) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("onEmptyCallback failed", t);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onTickTimeUpdated(long newTickTimeMillis) {
+        if (this.tickTimeMillis == newTickTimeMillis) {
+            return;
+        }
+        this.tickTimeMillis = newTickTimeMillis;
+        // Propagate to all subscriptions
+        for (SubContext sc : subscriptionContexts.values()) {
+            sc.tickTimeMillis = newTickTimeMillis;
+        }
+        // Re-evaluate timer scheduling with new tick time
+        timerLock.lock();
+        try {
+            updateTimerLocked();
+        } finally {
+            timerLock.unlock();
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Updated tickTimeMillis for topic-level delayed delivery 
manager to {} ms", newTickTimeMillis);
+        }
+    }
+
+    @Override
+    public long topicBufferMemoryBytes() {
+        return bufferMemoryBytes.get();
+    }
+
+    @Override
+    public long topicDelayedMessages() {
+        return delayedMessagesCount.get();
+    }
+
+    @Override
+    public void close() {
+        timerLock.lock();
+        try {
+            if (timeout != null) {
+                timeout.cancel();
+                timeout = null;
+            }
+            currentTimeoutTarget = -1;
+        } finally {
+            timerLock.unlock();
+        }
+        delayedMessageMap.clear();
+        bucketLocks.clear();
+        subscriptionContexts.clear();
+        delayedMessagesCount.set(0);
+        bufferMemoryBytes.set(0);
+    }
+
+    /**
+     * Add a message to the global delayed message index.
+     */
+    boolean addMessageForSub(SubContext subContext, long ledgerId, long 
entryId, long deliverAt) {
+        if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) {
+            return false;
+        }
+
+        long timestamp = bucketStart(deliverAt);
+        ReentrantLock bLock = bucketLocks.computeIfAbsent(timestamp, k -> new 
ReentrantLock());
+        bLock.lock();
+        try {
+            Long2ObjectRBTreeMap<Roaring64Bitmap> ledgerMap =
+                    delayedMessageMap.computeIfAbsent(timestamp, k -> new 
Long2ObjectRBTreeMap<>());
+            Roaring64Bitmap entryIds = ledgerMap.get(ledgerId);
+            if (entryIds == null) {
+                entryIds = new Roaring64Bitmap();
+                ledgerMap.put(ledgerId, entryIds);
+            }
+            long before = entryIds.getLongSizeInBytes();
+            if (!entryIds.contains(entryId)) {
+                entryIds.add(entryId);
+                delayedMessagesCount.incrementAndGet();
+                long after = entryIds.getLongSizeInBytes();
+                bufferMemoryBytes.addAndGet(after - before);
+            }
+        } finally {
+            bLock.unlock();
+        }
+
+        // Timer update and fixed delay detection
+        timerLock.lock();
+        try {
+            updateTimerLocked();
+        } finally {
+            timerLock.unlock();
+        }
+        checkAndUpdateHighest(deliverAt);
+        return true;
+    }
+
+    private void checkAndUpdateHighest(long deliverAt) {
+        long current;
+        do {
+            current = highestDeliveryTimeTracked.get();
+            if (deliverAt < (current - tickTimeMillis)) {
+                messagesHaveFixedDelay = false;
+            }
+        } while (deliverAt > current && 
!highestDeliveryTimeTracked.compareAndSet(current, deliverAt));
+    }
+
+    /**
+     * Check if there are messages available for a subscription.
+     */
+    boolean hasMessageAvailableForSub(SubContext subContext) {
+        if (delayedMessageMap.isEmpty()) {
+            return false;
+        }
+        // Use firstEntry() to avoid NoSuchElementException on concurrent 
empty map
+        Map.Entry<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> first = 
delayedMessageMap.firstEntry();
+        if (first == null) {
+            return false;
+        }
+        long cutoffTime = subContext.getCutoffTime();
+        long firstTs = first.getKey();
+        return firstTs <= cutoffTime;
+    }
+
+    /**
+     * Get scheduled messages for a subscription.
+     */
+    NavigableSet<Position> getScheduledMessagesForSub(SubContext subContext, 
int maxMessages) {
+        NavigableSet<Position> positions = new TreeSet<>();
+        int remaining = maxMessages;
+
+        long cutoffTime = subContext.getCutoffTime();
+        Position markDelete = subContext.getMarkDeletePosition();
+
+        // Snapshot of buckets up to cutoff and iterate per-bucket with bucket 
locks
+        List<Long> tsList = new 
ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet());
+        for (Long ts : tsList) {
+            if (remaining <= 0) {
+                break;
+            }
+            ReentrantLock bLock = bucketLocks.get(ts);
+            if (bLock == null) {
+                continue;
+            }
+            bLock.lock();
+            try {
+                Long2ObjectRBTreeMap<Roaring64Bitmap> ledgerMap = 
delayedMessageMap.get(ts);
+                if (ledgerMap == null) {
+                    continue;
+                }
+                for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : 
ledgerMap.long2ObjectEntrySet()) {
+                    if (remaining <= 0) {
+                        break;
+                    }
+                    long ledgerId = ledgerEntry.getLongKey();
+                    Roaring64Bitmap entryIds = ledgerEntry.getValue();
+                    if (markDelete != null && ledgerId < 
markDelete.getLedgerId()) {
+                        continue;
+                    }
+                    LongIterator it = entryIds.getLongIterator();
+                    while (it.hasNext() && remaining > 0) {
+                        long entryId = it.next();
+                        if (markDelete != null && ledgerId == 
markDelete.getLedgerId()
+                                && entryId <= markDelete.getEntryId()) {
+                            continue;
+                        }
+                        positions.add(PositionFactory.create(ledgerId, 
entryId));
+                        remaining--;
+                    }
+                }
+            } finally {
+                bLock.unlock();
+            }
+        }
+
+        // Throttled prune: attempt prune even when result is empty 
(mark-delete might have filtered everything)
+        // Throttling ensures we don't pay the cost on every call
+        maybePruneByTime();
+
+        return positions;
+    }
+
+    /**
+     * Check if deliveries should be paused for a subscription.
+     */
+    boolean shouldPauseAllDeliveriesForSub(SubContext subContext) {
+        // Parity with legacy: pause if all observed delays are fixed and 
backlog is large enough
+        return subContext.getFixedDelayDetectionLookahead() > 0
+                && messagesHaveFixedDelay
+                && getNumberOfVisibleDelayedMessagesForSub(subContext) >= 
subContext.getFixedDelayDetectionLookahead()
+                && !hasMessageAvailableForSub(subContext);
+    }
+
+    /**
+     * Clear delayed messages for a subscription (no-op for topic-level 
manager).
+     */
+    void clearForSub() {
+        // No-op: we don't clear global index for individual subscriptions
+    }
+
+    /**
+     * Update mark delete position for a subscription.
+     */
+    void updateMarkDeletePosition(SubContext subContext, Position position) {
+        // Event-driven update from dispatcher; keep it lightweight (no prune 
here)
+        subContext.updateMarkDeletePosition(position);
+    }
+
+    private void updateTimerLocked() {
+        // Use firstEntry() to avoid NoSuchElementException on concurrent 
empty map
+        Map.Entry<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> first = 
delayedMessageMap.firstEntry();
+        if (first == null) {
+            if (timeout != null) {
+                currentTimeoutTarget = -1;
+                timeout.cancel();
+                timeout = null;
+            }
+            return;
+        }
+        long nextDeliveryTime = first.getKey();
+        long now = clock.millis();
+        if (timeout != null && nextDeliveryTime == currentTimeoutTarget && 
currentTimeoutTarget >= now) {
+            return;
+        }
+        if (timeout != null) {
+            timeout.cancel();
+        }
+        long delayMillis = nextDeliveryTime - now;
+        if (delayMillis < 0) {
+            // Bucket already in the past: schedule immediate to unblock 
readers
+            delayMillis = 0;
+        }
+        long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+        long calculatedDelayMillis = Math.max(delayMillis, 
remainingTickDelayMillis);
+        currentTimeoutTarget = nextDeliveryTime;
+        timeout = timer.newTimeout(this, calculatedDelayMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    private long getNumberOfVisibleDelayedMessagesForSub(SubContext 
subContext) {
+        // Simplified implementation - returns total count
+        // Could be enhanced to count only messages visible to this 
subscription
+        return delayedMessagesCount.get();
+    }
+
+    private void pruneByMinMarkDelete() {
+        // Find the minimum mark delete position across all subscriptions.
+        // If any subscription hasn't established a mark-delete yet, skip 
pruning to preserve global visibility.
+        Position minMarkDelete = null;
+        for (SubContext subContext : subscriptionContexts.values()) {
+            Position markDelete = subContext.getMarkDeletePosition();
+            if (markDelete == null) {

Review Comment:
   [nitpick] The comment could be clearer about the rationale for skipping 
pruning when any subscription lacks a mark-delete position. Consider expanding 
the comment to explain that this ensures all subscriptions can still read the 
delayed messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to