Denovo1998 commented on code in PR #24927: URL: https://github.com/apache/pulsar/pull/24927#discussion_r2484806334
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.roaringbitmap.longlong.LongIterator; +import org.roaringbitmap.longlong.Roaring64Bitmap; + +/** + * In-memory implementation of topic-level delayed delivery tracker manager. + * This manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, significantly reducing memory usage in multi-subscription scenarios. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedDeliveryTrackerManager, TimerTask { + + // Global delayed message index: timestamp -> ledgerId -> entryId bitmap + // Outer: sorted by timestamp for efficient finding of earliest bucket + // Inner: per-ledger bitmaps of entry-ids + private final ConcurrentSkipListMap<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>> delayedMessageMap = + new ConcurrentSkipListMap<>(); + + // Subscription registry: subscription name -> subscription context + private final ConcurrentHashMap<String, SubContext> subscriptionContexts = new ConcurrentHashMap<>(); + + // Timer for delayed delivery + private final Timer timer; + private Timeout timeout; + private long currentTimeoutTarget = -1; + // Last time the TimerTask was triggered + private long lastTickRun = 0L; + + // Configuration + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + + // Statistics + private final AtomicLong delayedMessagesCount = new AtomicLong(0); + private final AtomicLong bufferMemoryBytes = new AtomicLong(0); + + // Prune throttling + // Last pruning time + private final AtomicLong lastPruneNanos = new AtomicLong(0); + // Minimum interval between prunes + private final long minPruneIntervalNanos; + + // Ratio of eligible subscriptions required to opportunistically prune [0.0, 1.0] + private final double pruneEligibleRatio; + + // Fixed-delay detection (parity with legacy behavior) + private final AtomicLong highestDeliveryTimeTracked = new AtomicLong(0); + private volatile boolean messagesHaveFixedDelay = true; + + // Per-bucket locks (timestamp -> lock) for fine-grained concurrency + private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new ConcurrentHashMap<>(); + + // Timer state guard + private final ReentrantLock timerLock = new ReentrantLock(); + + /** + * Subscription context that holds per-subscription state. + */ + @Getter + static class SubContext { + private final AbstractPersistentDispatcherMultipleConsumers dispatcher; + private final String subscriptionName; + private volatile long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + private volatile Position markDeletePosition; + + /** + * Constructs a new SubContext for a subscription. + * + * @param dispatcher the dispatcher associated with the subscription + * @param tickTimeMillis the tick interval in milliseconds for delayed delivery checks + * @param isDelayedDeliveryDeliverAtTimeStrict if true, delayed messages are delivered strictly at their + * scheduled time; if false, messages may be delivered in the next + * tick window + * @param fixedDelayDetectionLookahead the lookahead window (in milliseconds) used for + * detecting fixed-delay messages + * @param clock the clock instance used for time calculations + */ + SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead, + Clock clock) { + this.dispatcher = dispatcher; + this.subscriptionName = dispatcher.getSubscription().getName(); + this.tickTimeMillis = tickTimeMillis; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.clock = clock; + } + + void updateMarkDeletePosition(Position position) { + this.markDeletePosition = position; + } + + long getCutoffTime() { + long now = clock.millis(); + return isDelayedDeliveryDeliverAtTimeStrict ? now : now + tickTimeMillis; + } + } + + private final Runnable onEmptyCallback; + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, + 0, 0.5, null); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead, + long pruneMinIntervalMillis, + double pruneEligibleRatio, + Runnable onEmptyCallback) { + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.clock = clock; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.onEmptyCallback = onEmptyCallback; + // Prune throttle interval: use configured override if positive, otherwise adaptive clamp [5ms, 50ms] + long pruneMs = pruneMinIntervalMillis > 0 + ? pruneMinIntervalMillis + : Math.max(5L, Math.min(50L, tickTimeMillis)); + this.minPruneIntervalNanos = TimeUnit.MILLISECONDS.toNanos(pruneMs); + // Prune eligible ratio: clamp into [0.0, 1.0] + if (Double.isNaN(pruneEligibleRatio)) { + pruneEligibleRatio = 0.5; + } + this.pruneEligibleRatio = Math.max(0.0, Math.min(1.0, pruneEligibleRatio)); + } + + // We bucket messages by aligning the deliverAt timestamp to the start of the logical tick window: + // bucketStart = deliverAt - (deliverAt % tickTimeMillis) + // If tickTimeMillis changes over time, the same message may land in different buckets when re-added + // by another subscription. Read paths dedup via TreeSet and counts include duplicates by design. + private long bucketStart(long timestamp) { + long t = this.tickTimeMillis; + if (t <= 0) { + return timestamp; + } + long mod = timestamp % t; + if (mod == 0) { + return timestamp; + } + return timestamp - mod; + } + + @Override + public DelayedDeliveryTracker createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + SubContext subContext = subscriptionContexts.computeIfAbsent(subscriptionName, + k -> new SubContext(dispatcher, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, clock)); + return new InMemoryTopicDelayedDeliveryTracker(this, subContext); + } + + @Override + public void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + subscriptionContexts.remove(subscriptionName); + // If no more subscriptions, proactively free index and release memory + if (subscriptionContexts.isEmpty()) { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + if (onEmptyCallback != null) { + try { + onEmptyCallback.run(); + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug("onEmptyCallback failed", t); + } + } + } + } + } + + /** + * Whether there are active subscriptions registered with this manager. + */ + public boolean hasActiveSubscriptions() { + return !subscriptionContexts.isEmpty(); + } + + @Override + public void onTickTimeUpdated(long newTickTimeMillis) { + if (this.tickTimeMillis == newTickTimeMillis) { + return; + } + this.tickTimeMillis = newTickTimeMillis; + // Propagate to all subscriptions + for (SubContext sc : subscriptionContexts.values()) { + sc.tickTimeMillis = newTickTimeMillis; + } + // Re-evaluate timer scheduling with new tick time + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + if (log.isDebugEnabled()) { + log.debug("Updated tickTimeMillis for topic-level delayed delivery manager to {} ms", newTickTimeMillis); + } + } Review Comment: @lhotari @codelipenghui @coderzc @Apurva007 @thetumbled @dao-jun @BewareMyPower Currently, getScheduledMessagesForSub does not mark the returned position as "Scheduled for this subscription" nor does it delete it in the topic-level index (can only wait for unified pruning after all subscriptions mark-delete). As a result, before the ack, the next getScheduledMessagesForSub will still return the same position, causing duplicate scheduling and duplicate reading, which in severe cases can lead to duplicate delivery. The original per-sub InMemoryDelayedDeliveryTracker usually "takes out and removes" from the subscription structure when getScheduledMessages, avoiding duplicate scheduling; after the shared index, a per-sub "pending/scheduled but unconfirmed" collection or cursor is also needed, used to filter out the positions that have been scheduled but not confirmed when getScheduledMessagesForSub, and will not return these positions again before mark-delete. **This needs optimization, but it feels like there's no low-cost way to handle it.** 1.Index bucketing uses a fixed `indexGranularityMillis` (can be the initial tick or a fixed constant of 10ms/1ms), no longer changes with resetTickTime; resetTickTime only affects the triggering frequency of the Timer. 2.Deduplication on the read side. In SubContext, maintain a pending collection of ledgerId -> Roaring64Bitmap(entryId); get ScheduledMessagesForSub records the returned positions in pending, and subsequent calls filter pending; when mark-delete is advanced, remove from pending; Timer judgment also filters pending to avoid empty rotation. 3.Block and rebuild delayedMessageMap here at onTickTimeUpdated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
