Copilot commented on code in PR #24927:
URL: https://github.com/apache/pulsar/pull/24927#discussion_r2483194517


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerView.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import java.util.NavigableSet;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+
+/**
+ * View object for a subscription that implements DelayedDeliveryTracker 
interface.
+ * This view forwards all operations to the topic-level manager while 
maintaining
+ * compatibility with existing dispatcher logic.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerView implements 
DelayedDeliveryTracker {
+
+    private final InMemoryTopicDelayedDeliveryTrackerManager manager;
+    private final InMemoryTopicDelayedDeliveryTrackerManager.SubContext 
subContext;
+    private boolean closed = false;
+
+    public 
InMemoryTopicDelayedDeliveryTrackerView(InMemoryTopicDelayedDeliveryTrackerManager
 manager,
+                                                   
InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext) {
+        this.manager = manager;
+        this.subContext = subContext;
+    }
+
+    @Override
+    public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
+        checkClosed();
+        return manager.addMessageForSub(subContext, ledgerId, entryId, 
deliveryAt);
+    }
+
+    @Override
+    public boolean hasMessageAvailable() {
+        checkClosed();
+        return manager.hasMessageAvailableForSub(subContext);
+    }
+
+    @Override
+    public long getNumberOfDelayedMessages() {
+        checkClosed();
+        // Return an estimate of visible delayed messages for this subscription
+        // For now, return the total count - could be enhanced to count only 
visible messages

Review Comment:
   The comment indicates this is returning an estimate and acknowledges the 
limitation, but the method name `getNumberOfDelayedMessages()` suggests an 
exact count per subscription. This discrepancy could mislead API consumers. 
Consider updating the documentation or logging a warning about this behavior 
difference from the legacy implementation.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that 
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription 
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements 
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+    // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+    // Outer: sorted by timestamp for efficient finding of earliest bucket
+    // Inner: per-ledger bitmaps of entry-ids
+    private final ConcurrentSkipListMap<Long, ConcurrentHashMap<Long, 
Roaring64Bitmap>> delayedMessageMap =
+            new ConcurrentSkipListMap<>();
+
+    // Subscription registry: subscription name -> subscription context
+    private final ConcurrentHashMap<String, SubContext> subscriptionContexts = 
new ConcurrentHashMap<>();
+
+    // Timer for delayed delivery
+    private final Timer timer;
+    private Timeout timeout;
+    private long currentTimeoutTarget = -1;
+    // Last time the TimerTask was triggered
+    private long lastTickRun = 0L;
+
+    // Configuration
+    private long tickTimeMillis;
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final long fixedDelayDetectionLookahead;
+    private final Clock clock;
+
+    // Statistics
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+    private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+    // Fixed-delay detection (parity with legacy behavior)
+    private volatile long highestDeliveryTimeTracked = 0;
+    private volatile boolean messagesHaveFixedDelay = true;
+
+    // Timestamp precision for memory optimization
+    private int timestampPrecisionBitCnt;
+
+    // Per-bucket locks (timestamp -> lock) for fine-grained concurrency
+    private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new 
ConcurrentHashMap<>();
+
+    // Timer state guard
+    private final ReentrantLock timerLock = new ReentrantLock();
+
+
+    /**
+     * Subscription context that holds per-subscription state.
+     */
+    @Getter
+    static class SubContext {
+        private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+        private final String subscriptionName;
+        private long tickTimeMillis;
+        private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+        private final long fixedDelayDetectionLookahead;
+        private final Clock clock;
+        private Position markDeletePosition;
+
+        SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, 
long tickTimeMillis,
+                   boolean isDelayedDeliveryDeliverAtTimeStrict, long 
fixedDelayDetectionLookahead,
+                   Clock clock) {
+            this.dispatcher = dispatcher;
+            this.subscriptionName = dispatcher.getSubscription().getName();
+            this.tickTimeMillis = tickTimeMillis;
+            this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+            this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+            this.clock = clock;
+        }
+
+        void updateMarkDeletePosition(Position position) {
+            this.markDeletePosition = position;
+        }
+
+        long getCutoffTime() {
+            long now = clock.millis();
+            return isDelayedDeliveryDeliverAtTimeStrict ? now : now + 
tickTimeMillis;
+        }
+    }
+
+    private final Runnable onEmptyCallback;
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
+             fixedDelayDetectionLookahead, null);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead,
+                                                      Runnable 
onEmptyCallback) {
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+        this.onEmptyCallback = onEmptyCallback;
+    }
+
+    private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
+        int bitCnt = 0;
+        while (tickTimeMillis > 0) {
+            tickTimeMillis >>= 1;
+            bitCnt++;
+        }
+        return bitCnt > 0 ? bitCnt - 1 : 0;
+    }
+
+    private static long trimLowerBit(long timestamp, int bits) {
+        return timestamp & (-1L << bits);
+    }
+
+    @Override
+    public DelayedDeliveryTracker 
createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        SubContext subContext = 
subscriptionContexts.computeIfAbsent(subscriptionName,
+                k -> new SubContext(dispatcher, tickTimeMillis, 
isDelayedDeliveryDeliverAtTimeStrict,
+                        fixedDelayDetectionLookahead, clock));
+        return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext);
+    }
+
+    @Override
+    public void unregister(AbstractPersistentDispatcherMultipleConsumers 
dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        subscriptionContexts.remove(subscriptionName);
+        // If no more subscriptions, proactively free index and release memory
+        if (subscriptionContexts.isEmpty()) {
+            timerLock.lock();
+            try {
+                if (timeout != null) {
+                    timeout.cancel();
+                    timeout = null;
+                }
+                currentTimeoutTarget = -1;
+            } finally {
+                timerLock.unlock();
+            }
+            delayedMessageMap.clear();
+            bucketLocks.clear();
+            delayedMessagesCount.set(0);
+            bufferMemoryBytes.set(0);
+            if (onEmptyCallback != null) {
+                try {
+                    onEmptyCallback.run();
+                } catch (Throwable t) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("onEmptyCallback failed", t);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onTickTimeUpdated(long newTickTimeMillis) {
+        if (this.tickTimeMillis == newTickTimeMillis) {
+            return;
+        }
+        this.tickTimeMillis = newTickTimeMillis;
+        // Update precision bits for new tick time (accept old/new buckets 
co-exist)
+        this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(newTickTimeMillis);
+        // Propagate to all subscriptions
+        for (SubContext sc : subscriptionContexts.values()) {
+            sc.tickTimeMillis = newTickTimeMillis;
+        }
+        // Re-evaluate timer scheduling with new tick time
+        timerLock.lock();
+        try {
+            updateTimerLocked();
+        } finally {
+            timerLock.unlock();
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Updated tickTimeMillis for topic-level delayed delivery 
manager to {} ms", newTickTimeMillis);
+        }
+    }
+
+    @Override
+    public long topicBufferMemoryBytes() {
+        return bufferMemoryBytes.get();
+    }
+
+    @Override
+    public long topicDelayedMessages() {
+        return delayedMessagesCount.get();
+    }
+
+    @Override
+    public void close() {
+        timerLock.lock();
+        try {
+            if (timeout != null) {
+                timeout.cancel();
+                timeout = null;
+            }
+            currentTimeoutTarget = -1;
+        } finally {
+            timerLock.unlock();
+        }
+        delayedMessageMap.clear();
+        bucketLocks.clear();
+        subscriptionContexts.clear();
+        delayedMessagesCount.set(0);
+        bufferMemoryBytes.set(0);
+    }
+
+    // Internal methods for subscription views
+
+    /**
+     * Add a message to the global delayed message index.
+     */
+    boolean addMessageForSub(SubContext subContext, long ledgerId, long 
entryId, long deliverAt) {
+        if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) {
+            return false;
+        }
+
+        long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
+        ReentrantLock bLock = bucketLocks.computeIfAbsent(timestamp, k -> new 
ReentrantLock());
+        bLock.lock();
+        try {
+            ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap =
+                    delayedMessageMap.computeIfAbsent(timestamp, k -> new 
ConcurrentHashMap<>());
+            Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k 
-> new Roaring64Bitmap());
+            long before = entryIds.getLongSizeInBytes();
+            if (!entryIds.contains(entryId)) {
+                entryIds.add(entryId);
+                delayedMessagesCount.incrementAndGet();
+                long after = entryIds.getLongSizeInBytes();
+                bufferMemoryBytes.addAndGet(after - before);
+            }
+        } finally {
+            bLock.unlock();
+        }
+
+        // Timer update and fixed delay detection
+        timerLock.lock();
+        try {
+            updateTimerLocked();
+        } finally {
+            timerLock.unlock();
+        }
+        checkAndUpdateHighest(deliverAt);
+        return true;
+    }
+
+    private void checkAndUpdateHighest(long deliverAt) {
+        if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) {
+            messagesHaveFixedDelay = false;
+        }
+        highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, 
deliverAt);
+    }
+
+    /**
+     * Check if there are messages available for a subscription.
+     */
+    boolean hasMessageAvailableForSub(SubContext subContext) {
+        if (delayedMessageMap.isEmpty()) {
+            return false;
+        }
+        Long firstKey = delayedMessageMap.firstKey();
+        if (firstKey == null) {
+            return false;
+        }
+        long cutoffTime = subContext.getCutoffTime();
+        return firstKey <= cutoffTime;
+    }
+
+    /**
+     * Get scheduled messages for a subscription.
+     */
+    NavigableSet<Position> getScheduledMessagesForSub(SubContext subContext, 
int maxMessages) {
+        NavigableSet<Position> positions = new TreeSet<>();
+        int remaining = maxMessages;
+
+        // Refresh mark-delete once outside of any bucket lock
+        refreshMarkDeletePosition(subContext);
+        long cutoffTime = subContext.getCutoffTime();
+        Position markDelete = subContext.getMarkDeletePosition();
+
+        // Snapshot of buckets up to cutoff and iterate per-bucket with bucket 
locks
+        java.util.List<Long> tsList = new 
java.util.ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet());
+        for (Long ts : tsList) {
+            if (remaining <= 0) {
+                break;
+            }
+            ReentrantLock bLock = bucketLocks.get(ts);
+            if (bLock == null) {
+                continue;
+            }
+            bLock.lock();
+            try {
+                ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = 
delayedMessageMap.get(ts);
+                if (ledgerMap == null) {
+                    continue;
+                }
+                for (Map.Entry<Long, Roaring64Bitmap> ledgerEntry : 
ledgerMap.entrySet()) {
+                    if (remaining <= 0) {
+                        break;
+                    }
+                    long ledgerId = ledgerEntry.getKey();
+                    Roaring64Bitmap entryIds = ledgerEntry.getValue();
+                    if (markDelete != null && ledgerId < 
markDelete.getLedgerId()) {
+                        continue;
+                    }
+                    org.roaringbitmap.longlong.LongIterator it = 
entryIds.getLongIterator();
+                    while (it.hasNext() && remaining > 0) {
+                        long entryId = it.next();
+                        if (markDelete != null && ledgerId == 
markDelete.getLedgerId()
+                                && entryId <= markDelete.getEntryId()) {
+                            continue;
+                        }
+                        positions.add(PositionFactory.create(ledgerId, 
entryId));
+                        remaining--;
+                    }
+                }
+            } finally {
+                bLock.unlock();
+            }
+        }
+
+        // Prune global index based on min mark-delete across all 
subscriptions (write path)
+        if (!positions.isEmpty()) {
+            pruneByMinMarkDelete();
+        }
+
+        return positions;
+    }
+
+    /**
+     * Check if deliveries should be paused for a subscription.
+     */
+    boolean shouldPauseAllDeliveriesForSub(SubContext subContext) {
+        // Parity with legacy: pause if all observed delays are fixed and 
backlog is large enough
+        return subContext.getFixedDelayDetectionLookahead() > 0
+                && messagesHaveFixedDelay
+                && getNumberOfVisibleDelayedMessagesForSub(subContext) >= 
subContext.getFixedDelayDetectionLookahead()
+                && !hasMessageAvailableForSub(subContext);
+    }
+
+    /**
+     * Clear delayed messages for a subscription (no-op for topic-level 
manager).
+     */
+    void clearForSub() {
+        // No-op: we don't clear global index for individual subscriptions
+    }
+
+    /**
+     * Update mark delete position for a subscription.
+     */
+    void updateMarkDeletePosition(SubContext subContext, Position position) {
+        subContext.updateMarkDeletePosition(position);
+        pruneByMinMarkDelete();
+    }
+
+    // Private helper methods
+
+    private void updateTimerLocked() {
+        if (delayedMessageMap.isEmpty()) {
+            if (timeout != null) {
+                currentTimeoutTarget = -1;
+                timeout.cancel();
+                timeout = null;
+            }
+            return;
+        }
+        Long nextKey = delayedMessageMap.firstKey();
+        if (nextKey == null) {
+            return;
+        }
+        long nextDeliveryTime = nextKey;
+        if (nextDeliveryTime == currentTimeoutTarget) {
+            return;
+        }
+        if (timeout != null) {
+            timeout.cancel();
+        }
+        long now = clock.millis();
+        long delayMillis = nextDeliveryTime - now;
+        if (delayMillis < 0) {
+            return;
+        }
+        long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+        long calculatedDelayMillis = Math.max(delayMillis, 
remainingTickDelayMillis);
+        currentTimeoutTarget = nextDeliveryTime;
+        timeout = timer.newTimeout(this, calculatedDelayMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    private void updateBufferMemoryEstimate() {
+        // No-op in incremental mode (kept for compatibility)
+    }
+
+    private long getNumberOfVisibleDelayedMessagesForSub(SubContext 
subContext) {
+        // Simplified implementation - returns total count
+        // Could be enhanced to count only messages visible to this 
subscription
+        return delayedMessagesCount.get();
+    }
+
+    private void pruneByMinMarkDelete() {
+        // Find the minimum mark delete position across all subscriptions
+        Position minMarkDelete = null;
+        for (SubContext subContext : subscriptionContexts.values()) {
+            Position markDelete = subContext.getMarkDeletePosition();
+            if (markDelete != null) {
+                if (minMarkDelete == null || 
markDelete.compareTo(minMarkDelete) < 0) {
+                    minMarkDelete = markDelete;
+                }
+            }
+        }
+
+        if (minMarkDelete == null) {
+            return;
+        }
+
+        // No idempotency set to clean (Option A): rely on per-bitmap removal 
below
+
+        // Prune per bucket under bucket lock
+        for (Long ts : new ArrayList<>(delayedMessageMap.keySet())) {
+            ReentrantLock bLock = bucketLocks.get(ts);
+            if (bLock == null) {
+                continue;
+            }
+            bLock.lock();
+            try {
+                ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = 
delayedMessageMap.get(ts);
+                if (ledgerMap == null) {
+                    continue;
+                }
+                ArrayList<Long> ledgersToRemove = new ArrayList<>();
+                for (Map.Entry<Long, Roaring64Bitmap> ledgerEntry : 
ledgerMap.entrySet()) {
+                    long ledgerId = ledgerEntry.getKey();
+                    Roaring64Bitmap entryIds = ledgerEntry.getValue();
+                    if (ledgerId < minMarkDelete.getLedgerId()) {
+                        long bytes = entryIds.getLongSizeInBytes();
+                        
delayedMessagesCount.addAndGet(-entryIds.getLongCardinality());
+                        bufferMemoryBytes.addAndGet(-bytes);
+                        ledgersToRemove.add(ledgerId);
+                    } else if (ledgerId == minMarkDelete.getLedgerId()) {
+                        long before = entryIds.getLongSizeInBytes();
+                        long removedCount = 0;
+                        org.roaringbitmap.longlong.LongIterator it = 
entryIds.getLongIterator();
+                        java.util.ArrayList<Long> toRemove = new 
java.util.ArrayList<>();
+                        while (it.hasNext()) {
+                            long e = it.next();
+                            if (e <= minMarkDelete.getEntryId()) {
+                                toRemove.add(e);
+                            }
+                        }
+                        for (Long e : toRemove) {
+                            entryIds.removeLong(e);
+                            removedCount++;
+                        }
+                        long after = entryIds.getLongSizeInBytes();
+                        delayedMessagesCount.addAndGet(-removedCount);
+                        bufferMemoryBytes.addAndGet(after - before);
+                        if (entryIds.isEmpty()) {
+                            ledgersToRemove.add(ledgerId);
+                        }
+                    }
+                }
+                for (Long ledgerId : ledgersToRemove) {
+                    ledgerMap.remove(ledgerId);
+                }
+                if (ledgerMap.isEmpty()) {
+                    delayedMessageMap.remove(ts);
+                    bucketLocks.remove(ts);
+                }
+            } finally {
+                bLock.unlock();
+            }
+        }
+    }
+
+    // idempotency set removed per Option A
+

Review Comment:
   This orphaned comment references 'Option A' without context, making it 
unclear to future maintainers. Either remove the comment or expand it to 
explain what 'Option A' was and why the idempotency set was removed.
   ```suggestion
   
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that 
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription 
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements 
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+    // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+    // Outer: sorted by timestamp for efficient finding of earliest bucket
+    // Inner: per-ledger bitmaps of entry-ids
+    private final ConcurrentSkipListMap<Long, ConcurrentHashMap<Long, 
Roaring64Bitmap>> delayedMessageMap =
+            new ConcurrentSkipListMap<>();
+
+    // Subscription registry: subscription name -> subscription context
+    private final ConcurrentHashMap<String, SubContext> subscriptionContexts = 
new ConcurrentHashMap<>();
+
+    // Timer for delayed delivery
+    private final Timer timer;
+    private Timeout timeout;
+    private long currentTimeoutTarget = -1;
+    // Last time the TimerTask was triggered
+    private long lastTickRun = 0L;
+
+    // Configuration
+    private long tickTimeMillis;
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final long fixedDelayDetectionLookahead;
+    private final Clock clock;
+
+    // Statistics
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+    private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+    // Fixed-delay detection (parity with legacy behavior)
+    private volatile long highestDeliveryTimeTracked = 0;
+    private volatile boolean messagesHaveFixedDelay = true;
+
+    // Timestamp precision for memory optimization
+    private int timestampPrecisionBitCnt;
+
+    // Per-bucket locks (timestamp -> lock) for fine-grained concurrency
+    private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new 
ConcurrentHashMap<>();

Review Comment:
   The `bucketLocks` map grows unbounded as new timestamp buckets are created 
but is only cleaned up during pruning. If timestamps are diverse or pruning is 
delayed, this could lead to memory leaks. Consider cleaning up bucket locks 
when the corresponding timestamp bucket is removed from `delayedMessageMap` 
(e.g., in `pruneByMinMarkDelete` after line 518).



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java:
##########
@@ -66,13 +72,35 @@ public DelayedDeliveryTracker 
newTracker(AbstractPersistentDispatcherMultipleCon
     }
 
     @VisibleForTesting
-    InMemoryDelayedDeliveryTracker 
newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
-        return new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
tickTimeMillis,
-                isDelayedDeliveryDeliverAtTimeStrict, 
fixedDelayDetectionLookahead);
+    DelayedDeliveryTracker 
newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String topicName = dispatcher.getTopic().getName();
+
+        // Get or create topic-level manager for this topic with onEmpty 
callback to remove from cache
+        final TopicDelayedDeliveryTrackerManager[] holder = new 
TopicDelayedDeliveryTrackerManager[1];
+        TopicDelayedDeliveryTrackerManager manager = 
topicManagers.computeIfAbsent(topicName, k -> {
+            InMemoryTopicDelayedDeliveryTrackerManager m = new 
InMemoryTopicDelayedDeliveryTrackerManager(
+                    timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
+                    fixedDelayDetectionLookahead, () -> 
topicManagers.remove(topicName, holder[0]));
+            holder[0] = m;
+            return m;
+        });

Review Comment:
   This pattern using a holder array to capture the manager reference for the 
onEmpty callback has a race condition. If `computeIfAbsent` returns an existing 
manager instead of creating a new one, `holder[0]` remains null, and the 
callback in the existing manager will reference the wrong object. Consider 
passing the topic name to the callback instead: `() -> 
topicManagers.remove(topicName)`.
   ```suggestion
           TopicDelayedDeliveryTrackerManager manager = 
topicManagers.computeIfAbsent(topicName, k -> 
               new InMemoryTopicDelayedDeliveryTrackerManager(
                   timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
                   fixedDelayDetectionLookahead, () -> 
topicManagers.remove(topicName)
               )
           );
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that 
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription 
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements 
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+    // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+    // Outer: sorted by timestamp for efficient finding of earliest bucket
+    // Inner: per-ledger bitmaps of entry-ids
+    private final ConcurrentSkipListMap<Long, ConcurrentHashMap<Long, 
Roaring64Bitmap>> delayedMessageMap =
+            new ConcurrentSkipListMap<>();
+
+    // Subscription registry: subscription name -> subscription context
+    private final ConcurrentHashMap<String, SubContext> subscriptionContexts = 
new ConcurrentHashMap<>();
+
+    // Timer for delayed delivery
+    private final Timer timer;
+    private Timeout timeout;
+    private long currentTimeoutTarget = -1;
+    // Last time the TimerTask was triggered
+    private long lastTickRun = 0L;
+
+    // Configuration
+    private long tickTimeMillis;
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final long fixedDelayDetectionLookahead;
+    private final Clock clock;
+
+    // Statistics
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+    private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+    // Fixed-delay detection (parity with legacy behavior)
+    private volatile long highestDeliveryTimeTracked = 0;
+    private volatile boolean messagesHaveFixedDelay = true;
+
+    // Timestamp precision for memory optimization
+    private int timestampPrecisionBitCnt;
+
+    // Per-bucket locks (timestamp -> lock) for fine-grained concurrency
+    private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new 
ConcurrentHashMap<>();
+
+    // Timer state guard
+    private final ReentrantLock timerLock = new ReentrantLock();
+
+
+    /**
+     * Subscription context that holds per-subscription state.
+     */
+    @Getter
+    static class SubContext {
+        private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+        private final String subscriptionName;
+        private long tickTimeMillis;
+        private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+        private final long fixedDelayDetectionLookahead;
+        private final Clock clock;
+        private Position markDeletePosition;
+
+        SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, 
long tickTimeMillis,
+                   boolean isDelayedDeliveryDeliverAtTimeStrict, long 
fixedDelayDetectionLookahead,
+                   Clock clock) {
+            this.dispatcher = dispatcher;
+            this.subscriptionName = dispatcher.getSubscription().getName();
+            this.tickTimeMillis = tickTimeMillis;
+            this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+            this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+            this.clock = clock;
+        }
+
+        void updateMarkDeletePosition(Position position) {
+            this.markDeletePosition = position;
+        }
+
+        long getCutoffTime() {
+            long now = clock.millis();
+            return isDelayedDeliveryDeliverAtTimeStrict ? now : now + 
tickTimeMillis;
+        }
+    }
+
+    private final Runnable onEmptyCallback;
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
+             fixedDelayDetectionLookahead, null);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead,
+                                                      Runnable 
onEmptyCallback) {
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+        this.onEmptyCallback = onEmptyCallback;
+    }
+
+    private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
+        int bitCnt = 0;
+        while (tickTimeMillis > 0) {
+            tickTimeMillis >>= 1;
+            bitCnt++;
+        }
+        return bitCnt > 0 ? bitCnt - 1 : 0;
+    }
+
+    private static long trimLowerBit(long timestamp, int bits) {
+        return timestamp & (-1L << bits);
+    }
+
+    @Override
+    public DelayedDeliveryTracker 
createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        SubContext subContext = 
subscriptionContexts.computeIfAbsent(subscriptionName,
+                k -> new SubContext(dispatcher, tickTimeMillis, 
isDelayedDeliveryDeliverAtTimeStrict,
+                        fixedDelayDetectionLookahead, clock));
+        return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext);
+    }
+
+    @Override
+    public void unregister(AbstractPersistentDispatcherMultipleConsumers 
dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        subscriptionContexts.remove(subscriptionName);
+        // If no more subscriptions, proactively free index and release memory
+        if (subscriptionContexts.isEmpty()) {
+            timerLock.lock();
+            try {
+                if (timeout != null) {
+                    timeout.cancel();
+                    timeout = null;
+                }
+                currentTimeoutTarget = -1;
+            } finally {
+                timerLock.unlock();
+            }
+            delayedMessageMap.clear();
+            bucketLocks.clear();
+            delayedMessagesCount.set(0);
+            bufferMemoryBytes.set(0);
+            if (onEmptyCallback != null) {
+                try {
+                    onEmptyCallback.run();
+                } catch (Throwable t) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("onEmptyCallback failed", t);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onTickTimeUpdated(long newTickTimeMillis) {
+        if (this.tickTimeMillis == newTickTimeMillis) {
+            return;
+        }
+        this.tickTimeMillis = newTickTimeMillis;
+        // Update precision bits for new tick time (accept old/new buckets 
co-exist)
+        this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(newTickTimeMillis);
+        // Propagate to all subscriptions
+        for (SubContext sc : subscriptionContexts.values()) {
+            sc.tickTimeMillis = newTickTimeMillis;
+        }
+        // Re-evaluate timer scheduling with new tick time
+        timerLock.lock();
+        try {
+            updateTimerLocked();
+        } finally {
+            timerLock.unlock();
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Updated tickTimeMillis for topic-level delayed delivery 
manager to {} ms", newTickTimeMillis);
+        }
+    }
+
+    @Override
+    public long topicBufferMemoryBytes() {
+        return bufferMemoryBytes.get();
+    }
+
+    @Override
+    public long topicDelayedMessages() {
+        return delayedMessagesCount.get();
+    }
+
+    @Override
+    public void close() {
+        timerLock.lock();
+        try {
+            if (timeout != null) {
+                timeout.cancel();
+                timeout = null;
+            }
+            currentTimeoutTarget = -1;
+        } finally {
+            timerLock.unlock();
+        }
+        delayedMessageMap.clear();
+        bucketLocks.clear();
+        subscriptionContexts.clear();
+        delayedMessagesCount.set(0);
+        bufferMemoryBytes.set(0);
+    }
+
+    // Internal methods for subscription views
+
+    /**
+     * Add a message to the global delayed message index.
+     */
+    boolean addMessageForSub(SubContext subContext, long ledgerId, long 
entryId, long deliverAt) {
+        if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) {
+            return false;
+        }
+
+        long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
+        ReentrantLock bLock = bucketLocks.computeIfAbsent(timestamp, k -> new 
ReentrantLock());
+        bLock.lock();
+        try {
+            ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap =
+                    delayedMessageMap.computeIfAbsent(timestamp, k -> new 
ConcurrentHashMap<>());
+            Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k 
-> new Roaring64Bitmap());
+            long before = entryIds.getLongSizeInBytes();
+            if (!entryIds.contains(entryId)) {
+                entryIds.add(entryId);
+                delayedMessagesCount.incrementAndGet();
+                long after = entryIds.getLongSizeInBytes();
+                bufferMemoryBytes.addAndGet(after - before);
+            }
+        } finally {
+            bLock.unlock();
+        }
+
+        // Timer update and fixed delay detection
+        timerLock.lock();
+        try {
+            updateTimerLocked();
+        } finally {
+            timerLock.unlock();
+        }
+        checkAndUpdateHighest(deliverAt);
+        return true;
+    }
+
+    private void checkAndUpdateHighest(long deliverAt) {
+        if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) {
+            messagesHaveFixedDelay = false;
+        }
+        highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, 
deliverAt);
+    }
+
+    /**
+     * Check if there are messages available for a subscription.
+     */
+    boolean hasMessageAvailableForSub(SubContext subContext) {
+        if (delayedMessageMap.isEmpty()) {
+            return false;
+        }
+        Long firstKey = delayedMessageMap.firstKey();
+        if (firstKey == null) {
+            return false;
+        }
+        long cutoffTime = subContext.getCutoffTime();
+        return firstKey <= cutoffTime;
+    }
+
+    /**
+     * Get scheduled messages for a subscription.
+     */
+    NavigableSet<Position> getScheduledMessagesForSub(SubContext subContext, 
int maxMessages) {
+        NavigableSet<Position> positions = new TreeSet<>();
+        int remaining = maxMessages;
+
+        // Refresh mark-delete once outside of any bucket lock
+        refreshMarkDeletePosition(subContext);
+        long cutoffTime = subContext.getCutoffTime();
+        Position markDelete = subContext.getMarkDeletePosition();
+
+        // Snapshot of buckets up to cutoff and iterate per-bucket with bucket 
locks
+        java.util.List<Long> tsList = new 
java.util.ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet());
+        for (Long ts : tsList) {
+            if (remaining <= 0) {
+                break;
+            }
+            ReentrantLock bLock = bucketLocks.get(ts);
+            if (bLock == null) {
+                continue;
+            }
+            bLock.lock();
+            try {
+                ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = 
delayedMessageMap.get(ts);
+                if (ledgerMap == null) {
+                    continue;
+                }
+                for (Map.Entry<Long, Roaring64Bitmap> ledgerEntry : 
ledgerMap.entrySet()) {
+                    if (remaining <= 0) {
+                        break;
+                    }
+                    long ledgerId = ledgerEntry.getKey();
+                    Roaring64Bitmap entryIds = ledgerEntry.getValue();
+                    if (markDelete != null && ledgerId < 
markDelete.getLedgerId()) {
+                        continue;
+                    }
+                    org.roaringbitmap.longlong.LongIterator it = 
entryIds.getLongIterator();
+                    while (it.hasNext() && remaining > 0) {
+                        long entryId = it.next();
+                        if (markDelete != null && ledgerId == 
markDelete.getLedgerId()
+                                && entryId <= markDelete.getEntryId()) {
+                            continue;
+                        }
+                        positions.add(PositionFactory.create(ledgerId, 
entryId));
+                        remaining--;
+                    }
+                }
+            } finally {
+                bLock.unlock();
+            }
+        }
+
+        // Prune global index based on min mark-delete across all 
subscriptions (write path)
+        if (!positions.isEmpty()) {
+            pruneByMinMarkDelete();
+        }
+
+        return positions;
+    }
+
+    /**
+     * Check if deliveries should be paused for a subscription.
+     */
+    boolean shouldPauseAllDeliveriesForSub(SubContext subContext) {
+        // Parity with legacy: pause if all observed delays are fixed and 
backlog is large enough
+        return subContext.getFixedDelayDetectionLookahead() > 0
+                && messagesHaveFixedDelay
+                && getNumberOfVisibleDelayedMessagesForSub(subContext) >= 
subContext.getFixedDelayDetectionLookahead()
+                && !hasMessageAvailableForSub(subContext);
+    }
+
+    /**
+     * Clear delayed messages for a subscription (no-op for topic-level 
manager).
+     */
+    void clearForSub() {
+        // No-op: we don't clear global index for individual subscriptions
+    }
+
+    /**
+     * Update mark delete position for a subscription.
+     */
+    void updateMarkDeletePosition(SubContext subContext, Position position) {
+        subContext.updateMarkDeletePosition(position);
+        pruneByMinMarkDelete();
+    }
+
+    // Private helper methods
+
+    private void updateTimerLocked() {
+        if (delayedMessageMap.isEmpty()) {
+            if (timeout != null) {
+                currentTimeoutTarget = -1;
+                timeout.cancel();
+                timeout = null;
+            }
+            return;
+        }
+        Long nextKey = delayedMessageMap.firstKey();
+        if (nextKey == null) {
+            return;
+        }
+        long nextDeliveryTime = nextKey;
+        if (nextDeliveryTime == currentTimeoutTarget) {
+            return;
+        }
+        if (timeout != null) {
+            timeout.cancel();
+        }
+        long now = clock.millis();
+        long delayMillis = nextDeliveryTime - now;
+        if (delayMillis < 0) {
+            return;
+        }
+        long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+        long calculatedDelayMillis = Math.max(delayMillis, 
remainingTickDelayMillis);
+        currentTimeoutTarget = nextDeliveryTime;
+        timeout = timer.newTimeout(this, calculatedDelayMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    private void updateBufferMemoryEstimate() {
+        // No-op in incremental mode (kept for compatibility)
+    }
+

Review Comment:
   This empty method serves no purpose and adds confusion. Since it's never 
called and the comment indicates it's kept for compatibility, it should be 
removed to improve code clarity.
   ```suggestion
   
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that 
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription 
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements 
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+    // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+    // Outer: sorted by timestamp for efficient finding of earliest bucket
+    // Inner: per-ledger bitmaps of entry-ids
+    private final ConcurrentSkipListMap<Long, ConcurrentHashMap<Long, 
Roaring64Bitmap>> delayedMessageMap =
+            new ConcurrentSkipListMap<>();
+
+    // Subscription registry: subscription name -> subscription context
+    private final ConcurrentHashMap<String, SubContext> subscriptionContexts = 
new ConcurrentHashMap<>();
+
+    // Timer for delayed delivery
+    private final Timer timer;
+    private Timeout timeout;
+    private long currentTimeoutTarget = -1;
+    // Last time the TimerTask was triggered
+    private long lastTickRun = 0L;
+
+    // Configuration
+    private long tickTimeMillis;
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final long fixedDelayDetectionLookahead;
+    private final Clock clock;
+
+    // Statistics
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+    private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+    // Fixed-delay detection (parity with legacy behavior)
+    private volatile long highestDeliveryTimeTracked = 0;
+    private volatile boolean messagesHaveFixedDelay = true;
+
+    // Timestamp precision for memory optimization
+    private int timestampPrecisionBitCnt;
+
+    // Per-bucket locks (timestamp -> lock) for fine-grained concurrency
+    private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new 
ConcurrentHashMap<>();
+
+    // Timer state guard
+    private final ReentrantLock timerLock = new ReentrantLock();
+
+
+    /**
+     * Subscription context that holds per-subscription state.
+     */
+    @Getter
+    static class SubContext {
+        private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+        private final String subscriptionName;
+        private long tickTimeMillis;
+        private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+        private final long fixedDelayDetectionLookahead;
+        private final Clock clock;
+        private Position markDeletePosition;
+
+        SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, 
long tickTimeMillis,
+                   boolean isDelayedDeliveryDeliverAtTimeStrict, long 
fixedDelayDetectionLookahead,
+                   Clock clock) {
+            this.dispatcher = dispatcher;
+            this.subscriptionName = dispatcher.getSubscription().getName();
+            this.tickTimeMillis = tickTimeMillis;
+            this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+            this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+            this.clock = clock;
+        }
+
+        void updateMarkDeletePosition(Position position) {
+            this.markDeletePosition = position;
+        }
+
+        long getCutoffTime() {
+            long now = clock.millis();
+            return isDelayedDeliveryDeliverAtTimeStrict ? now : now + 
tickTimeMillis;
+        }
+    }
+
+    private final Runnable onEmptyCallback;
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
+             fixedDelayDetectionLookahead, null);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead,
+                                                      Runnable 
onEmptyCallback) {
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+        this.onEmptyCallback = onEmptyCallback;
+    }
+
+    private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
+        int bitCnt = 0;
+        while (tickTimeMillis > 0) {
+            tickTimeMillis >>= 1;
+            bitCnt++;
+        }
+        return bitCnt > 0 ? bitCnt - 1 : 0;
+    }
+
+    private static long trimLowerBit(long timestamp, int bits) {
+        return timestamp & (-1L << bits);
+    }
+
+    @Override
+    public DelayedDeliveryTracker 
createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        SubContext subContext = 
subscriptionContexts.computeIfAbsent(subscriptionName,
+                k -> new SubContext(dispatcher, tickTimeMillis, 
isDelayedDeliveryDeliverAtTimeStrict,
+                        fixedDelayDetectionLookahead, clock));
+        return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext);
+    }
+
+    @Override
+    public void unregister(AbstractPersistentDispatcherMultipleConsumers 
dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        subscriptionContexts.remove(subscriptionName);
+        // If no more subscriptions, proactively free index and release memory
+        if (subscriptionContexts.isEmpty()) {
+            timerLock.lock();
+            try {
+                if (timeout != null) {
+                    timeout.cancel();
+                    timeout = null;
+                }
+                currentTimeoutTarget = -1;
+            } finally {
+                timerLock.unlock();
+            }
+            delayedMessageMap.clear();
+            bucketLocks.clear();
+            delayedMessagesCount.set(0);
+            bufferMemoryBytes.set(0);
+            if (onEmptyCallback != null) {
+                try {
+                    onEmptyCallback.run();
+                } catch (Throwable t) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("onEmptyCallback failed", t);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onTickTimeUpdated(long newTickTimeMillis) {
+        if (this.tickTimeMillis == newTickTimeMillis) {
+            return;
+        }
+        this.tickTimeMillis = newTickTimeMillis;
+        // Update precision bits for new tick time (accept old/new buckets 
co-exist)
+        this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(newTickTimeMillis);
+        // Propagate to all subscriptions
+        for (SubContext sc : subscriptionContexts.values()) {
+            sc.tickTimeMillis = newTickTimeMillis;
+        }
+        // Re-evaluate timer scheduling with new tick time
+        timerLock.lock();
+        try {
+            updateTimerLocked();
+        } finally {
+            timerLock.unlock();
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Updated tickTimeMillis for topic-level delayed delivery 
manager to {} ms", newTickTimeMillis);
+        }
+    }
+
+    @Override
+    public long topicBufferMemoryBytes() {
+        return bufferMemoryBytes.get();
+    }
+
+    @Override
+    public long topicDelayedMessages() {
+        return delayedMessagesCount.get();
+    }
+
+    @Override
+    public void close() {
+        timerLock.lock();
+        try {
+            if (timeout != null) {
+                timeout.cancel();
+                timeout = null;
+            }
+            currentTimeoutTarget = -1;
+        } finally {
+            timerLock.unlock();
+        }
+        delayedMessageMap.clear();
+        bucketLocks.clear();
+        subscriptionContexts.clear();
+        delayedMessagesCount.set(0);
+        bufferMemoryBytes.set(0);
+    }
+
+    // Internal methods for subscription views
+
+    /**
+     * Add a message to the global delayed message index.
+     */
+    boolean addMessageForSub(SubContext subContext, long ledgerId, long 
entryId, long deliverAt) {
+        if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) {
+            return false;
+        }
+
+        long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
+        ReentrantLock bLock = bucketLocks.computeIfAbsent(timestamp, k -> new 
ReentrantLock());
+        bLock.lock();
+        try {
+            ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap =
+                    delayedMessageMap.computeIfAbsent(timestamp, k -> new 
ConcurrentHashMap<>());
+            Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k 
-> new Roaring64Bitmap());
+            long before = entryIds.getLongSizeInBytes();
+            if (!entryIds.contains(entryId)) {
+                entryIds.add(entryId);
+                delayedMessagesCount.incrementAndGet();
+                long after = entryIds.getLongSizeInBytes();
+                bufferMemoryBytes.addAndGet(after - before);
+            }
+        } finally {
+            bLock.unlock();
+        }
+
+        // Timer update and fixed delay detection
+        timerLock.lock();
+        try {
+            updateTimerLocked();
+        } finally {
+            timerLock.unlock();
+        }
+        checkAndUpdateHighest(deliverAt);
+        return true;
+    }
+
+    private void checkAndUpdateHighest(long deliverAt) {
+        if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) {
+            messagesHaveFixedDelay = false;
+        }
+        highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, 
deliverAt);
+    }
+
+    /**
+     * Check if there are messages available for a subscription.
+     */
+    boolean hasMessageAvailableForSub(SubContext subContext) {
+        if (delayedMessageMap.isEmpty()) {
+            return false;
+        }
+        Long firstKey = delayedMessageMap.firstKey();
+        if (firstKey == null) {
+            return false;
+        }
+        long cutoffTime = subContext.getCutoffTime();
+        return firstKey <= cutoffTime;
+    }
+
+    /**
+     * Get scheduled messages for a subscription.
+     */
+    NavigableSet<Position> getScheduledMessagesForSub(SubContext subContext, 
int maxMessages) {
+        NavigableSet<Position> positions = new TreeSet<>();
+        int remaining = maxMessages;
+
+        // Refresh mark-delete once outside of any bucket lock
+        refreshMarkDeletePosition(subContext);
+        long cutoffTime = subContext.getCutoffTime();
+        Position markDelete = subContext.getMarkDeletePosition();
+
+        // Snapshot of buckets up to cutoff and iterate per-bucket with bucket 
locks
+        java.util.List<Long> tsList = new 
java.util.ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet());
+        for (Long ts : tsList) {
+            if (remaining <= 0) {
+                break;
+            }
+            ReentrantLock bLock = bucketLocks.get(ts);
+            if (bLock == null) {
+                continue;
+            }
+            bLock.lock();
+            try {
+                ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = 
delayedMessageMap.get(ts);
+                if (ledgerMap == null) {
+                    continue;
+                }
+                for (Map.Entry<Long, Roaring64Bitmap> ledgerEntry : 
ledgerMap.entrySet()) {
+                    if (remaining <= 0) {
+                        break;
+                    }
+                    long ledgerId = ledgerEntry.getKey();
+                    Roaring64Bitmap entryIds = ledgerEntry.getValue();
+                    if (markDelete != null && ledgerId < 
markDelete.getLedgerId()) {
+                        continue;
+                    }
+                    org.roaringbitmap.longlong.LongIterator it = 
entryIds.getLongIterator();
+                    while (it.hasNext() && remaining > 0) {
+                        long entryId = it.next();
+                        if (markDelete != null && ledgerId == 
markDelete.getLedgerId()
+                                && entryId <= markDelete.getEntryId()) {
+                            continue;
+                        }
+                        positions.add(PositionFactory.create(ledgerId, 
entryId));
+                        remaining--;
+                    }
+                }
+            } finally {
+                bLock.unlock();
+            }
+        }
+
+        // Prune global index based on min mark-delete across all 
subscriptions (write path)
+        if (!positions.isEmpty()) {
+            pruneByMinMarkDelete();
+        }
+
+        return positions;
+    }
+
+    /**
+     * Check if deliveries should be paused for a subscription.
+     */
+    boolean shouldPauseAllDeliveriesForSub(SubContext subContext) {
+        // Parity with legacy: pause if all observed delays are fixed and 
backlog is large enough
+        return subContext.getFixedDelayDetectionLookahead() > 0
+                && messagesHaveFixedDelay
+                && getNumberOfVisibleDelayedMessagesForSub(subContext) >= 
subContext.getFixedDelayDetectionLookahead()
+                && !hasMessageAvailableForSub(subContext);
+    }
+
+    /**
+     * Clear delayed messages for a subscription (no-op for topic-level 
manager).
+     */
+    void clearForSub() {
+        // No-op: we don't clear global index for individual subscriptions
+    }
+
+    /**
+     * Update mark delete position for a subscription.
+     */
+    void updateMarkDeletePosition(SubContext subContext, Position position) {
+        subContext.updateMarkDeletePosition(position);
+        pruneByMinMarkDelete();
+    }
+
+    // Private helper methods
+
+    private void updateTimerLocked() {
+        if (delayedMessageMap.isEmpty()) {
+            if (timeout != null) {
+                currentTimeoutTarget = -1;
+                timeout.cancel();
+                timeout = null;
+            }
+            return;
+        }
+        Long nextKey = delayedMessageMap.firstKey();
+        if (nextKey == null) {
+            return;
+        }
+        long nextDeliveryTime = nextKey;
+        if (nextDeliveryTime == currentTimeoutTarget) {
+            return;
+        }
+        if (timeout != null) {
+            timeout.cancel();
+        }
+        long now = clock.millis();
+        long delayMillis = nextDeliveryTime - now;
+        if (delayMillis < 0) {
+            return;
+        }
+        long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+        long calculatedDelayMillis = Math.max(delayMillis, 
remainingTickDelayMillis);
+        currentTimeoutTarget = nextDeliveryTime;
+        timeout = timer.newTimeout(this, calculatedDelayMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    private void updateBufferMemoryEstimate() {
+        // No-op in incremental mode (kept for compatibility)
+    }
+
+    private long getNumberOfVisibleDelayedMessagesForSub(SubContext 
subContext) {
+        // Simplified implementation - returns total count
+        // Could be enhanced to count only messages visible to this 
subscription
+        return delayedMessagesCount.get();
+    }
+
+    private void pruneByMinMarkDelete() {
+        // Find the minimum mark delete position across all subscriptions
+        Position minMarkDelete = null;
+        for (SubContext subContext : subscriptionContexts.values()) {
+            Position markDelete = subContext.getMarkDeletePosition();
+            if (markDelete != null) {
+                if (minMarkDelete == null || 
markDelete.compareTo(minMarkDelete) < 0) {
+                    minMarkDelete = markDelete;
+                }
+            }
+        }
+
+        if (minMarkDelete == null) {
+            return;
+        }
+
+        // No idempotency set to clean (Option A): rely on per-bitmap removal 
below
+
+        // Prune per bucket under bucket lock
+        for (Long ts : new ArrayList<>(delayedMessageMap.keySet())) {
+            ReentrantLock bLock = bucketLocks.get(ts);
+            if (bLock == null) {
+                continue;
+            }
+            bLock.lock();
+            try {
+                ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = 
delayedMessageMap.get(ts);
+                if (ledgerMap == null) {
+                    continue;
+                }
+                ArrayList<Long> ledgersToRemove = new ArrayList<>();
+                for (Map.Entry<Long, Roaring64Bitmap> ledgerEntry : 
ledgerMap.entrySet()) {
+                    long ledgerId = ledgerEntry.getKey();
+                    Roaring64Bitmap entryIds = ledgerEntry.getValue();
+                    if (ledgerId < minMarkDelete.getLedgerId()) {
+                        long bytes = entryIds.getLongSizeInBytes();
+                        
delayedMessagesCount.addAndGet(-entryIds.getLongCardinality());
+                        bufferMemoryBytes.addAndGet(-bytes);
+                        ledgersToRemove.add(ledgerId);
+                    } else if (ledgerId == minMarkDelete.getLedgerId()) {
+                        long before = entryIds.getLongSizeInBytes();
+                        long removedCount = 0;
+                        org.roaringbitmap.longlong.LongIterator it = 
entryIds.getLongIterator();
+                        java.util.ArrayList<Long> toRemove = new 
java.util.ArrayList<>();

Review Comment:
   Using fully qualified class names (`java.util.ArrayList`) instead of imports 
reduces code readability. Since `ArrayList` is already imported at line 25, use 
the short form here.
   ```suggestion
                           ArrayList<Long> toRemove = new ArrayList<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to