Copilot commented on code in PR #24927:
URL: https://github.com/apache/pulsar/pull/24927#discussion_r2484219126
##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -370,7 +370,11 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(category = CATEGORY_SERVER, doc = """
Class name of the factory that implements the delayed deliver
tracker.
If value is
"org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory", \
- will create bucket based delayed message index tracker.
+ will create bucket based delayed message index tracker.\n
+ If value is
"org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory", \
+ will create topic-level in-memory delayed message index tracker.\n
+ If value is
"org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory", \
+ will create in-memory delayed delivery tracker (per existing
implementation).
Review Comment:
The documentation should clarify the key difference between these two
implementations: InMemoryTopicDelayedDeliveryTrackerFactory shares a single
index across all subscriptions on a topic (reducing memory usage in
multi-subscription scenarios), while InMemoryDelayedDeliveryTrackerFactory
maintains separate indexes per subscription. This distinction is important for
users choosing between the implementations.
##########
conf/standalone.conf:
##########
@@ -1422,6 +1422,8 @@ delayedDeliveryEnabled=true
# Class name of the factory that implements the delayed deliver tracker.
# If value is
"org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
# will create bucket based delayed message index tracker.
+# If value is
"org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory",
+# will use topic-level in-memory delayed message index tracker (alias of the
default in-memory implementation).
Review Comment:
The comment incorrectly states that
InMemoryTopicDelayedDeliveryTrackerFactory is an 'alias of the default
in-memory implementation'. This is misleading—it's actually a distinct
implementation that uses topic-level index sharing, not just an alias. The
comment should clarify that this is a different implementation with shared
topic-level indexing for reduced memory usage in multi-subscription scenarios.
```suggestion
# will use topic-level in-memory delayed message index tracker (distinct
from the default in-memory implementation; uses shared topic-level indexing for
reduced memory usage in multi-subscription scenarios).
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerFactory.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Clock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InMemoryTopicDelayedDeliveryTrackerFactory implements
DelayedDeliveryTrackerFactory {
+
+ private static final Logger log =
LoggerFactory.getLogger(InMemoryTopicDelayedDeliveryTrackerFactory.class);
+
+ private Timer timer;
+
+ private long tickTimeMillis;
+
+ private boolean isDelayedDeliveryDeliverAtTimeStrict;
+
+ private long fixedDelayDetectionLookahead;
+
+ // New tuning knobs
+ private long pruneMinIntervalMillis;
+ private double pruneEligibleRatio;
+ private long topicManagerIdleMillis;
+
+ // Cache of topic-level managers: topic name -> manager instance
+ private final ConcurrentMap<String, TopicDelayedDeliveryTrackerManager>
topicManagers = new ConcurrentHashMap<>();
+
+ @VisibleForTesting
+ InMemoryTopicDelayedDeliveryTrackerFactory(Timer timer, long
tickTimeMillis,
+ boolean
isDelayedDeliveryDeliverAtTimeStrict,
+ long fixedDelayDetectionLookahead) {
+ this.timer = timer;
+ this.tickTimeMillis = tickTimeMillis;
+ this.isDelayedDeliveryDeliverAtTimeStrict =
isDelayedDeliveryDeliverAtTimeStrict;
+ this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+ this.pruneMinIntervalMillis = 0;
+ this.pruneEligibleRatio = 0.5;
+ this.topicManagerIdleMillis = 0;
+ }
+
+ @VisibleForTesting
+ int getCachedManagersSize() {
+ return topicManagers.size();
+ }
+
+ @VisibleForTesting
+ boolean hasManagerForTopic(String topicName) {
+ return topicManagers.containsKey(topicName);
+ }
+
+ @Override
+ public void initialize(PulsarService pulsarService) {
+ ServiceConfiguration config = pulsarService.getConfig();
+ this.timer = new HashedWheelTimer(new
DefaultThreadFactory("pulsar-delayed-delivery"),
+ config.getDelayedDeliveryTickTimeMillis(),
TimeUnit.MILLISECONDS);
+ this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
+ this.isDelayedDeliveryDeliverAtTimeStrict =
config.isDelayedDeliveryDeliverAtTimeStrict();
+ this.fixedDelayDetectionLookahead =
config.getDelayedDeliveryFixedDelayDetectionLookahead();
+ this.pruneMinIntervalMillis =
config.getDelayedDeliveryPruneMinIntervalMillis();
+ this.pruneEligibleRatio =
config.getDelayedDeliveryPruneEligibleRatio();
+ this.topicManagerIdleMillis =
config.getDelayedDeliveryTopicManagerIdleMillis();
+ }
+
+ @Override
+ public DelayedDeliveryTracker
newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+ String topicName = dispatcher.getTopic().getName();
+ String subscriptionName = dispatcher.getSubscription().getName();
+ DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
+ try {
+ tracker = newTracker0(dispatcher);
+ } catch (Exception e) {
+ // it should never go here
+ log.warn("Failed to create InMemoryDelayedDeliveryTracker, topic
{}, subscription {}",
Review Comment:
The error message references 'InMemoryDelayedDeliveryTracker' but should
reference 'InMemoryTopicDelayedDeliveryTracker' to accurately reflect the class
being created in this factory.
```suggestion
log.warn("Failed to create InMemoryTopicDelayedDeliveryTracker,
topic {}, subscription {}",
```
##########
conf/broker.conf:
##########
@@ -654,6 +654,8 @@ delayedDeliveryEnabled=true
# Class name of the factory that implements the delayed deliver tracker.
# If value is
"org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
# will create bucket based delayed message index tracker.
+# If value is
"org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory",
+# will use topic-level in-memory delayed message index tracker (alias of the
default in-memory implementation).
Review Comment:
The comment incorrectly states that
InMemoryTopicDelayedDeliveryTrackerFactory is an 'alias of the default
in-memory implementation'. This is misleading—it's actually a distinct
implementation that uses topic-level index sharing, not just an alias. The
comment should clarify that this is a different implementation with shared
topic-level indexing for reduced memory usage in multi-subscription scenarios.
```suggestion
# will use a topic-level in-memory delayed message index tracker (a distinct
implementation that shares the index across subscriptions to reduce memory
usage in multi-subscription scenarios).
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]