This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 55826742d1c Allow to configure and disable the size of lookahead for 
detecting fixed delays in messages (#17907)
55826742d1c is described below

commit 55826742d1c589d106d7cbe97f12ec2e8bcca35f
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sat Oct 1 08:06:13 2022 -0700

    Allow to configure and disable the size of lookahead for detecting fixed 
delays in messages (#17907)
---
 conf/broker.conf                                   |  6 ++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++++
 .../delayed/InMemoryDelayedDeliveryTracker.java    | 18 ++++++++----
 .../InMemoryDelayedDeliveryTrackerFactory.java     |  5 +++-
 .../delayed/InMemoryDeliveryTrackerTest.java       | 34 +++++++++++++---------
 5 files changed, 48 insertions(+), 21 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index d117d679c85..e6b3aef8811 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -576,6 +576,12 @@ delayedDeliveryMaxNumBuckets=50
 # Enable share the delayed message index across subscriptions
 delayedDeliverySharedIndexEnabled=false
 
+# Size of the lookahead window to use when detecting if all the messages in 
the topic
+# have a fixed delay.
+# Default is 50,000. Setting the lookahead window to 0 will disable the logic 
to handle
+# fixed delays in messages in a different way.
+delayedDeliveryFixedDelayDetectionLookahead=50000
+
 # Whether to enable acknowledge of batch local index.
 acknowledgmentAtBatchIndexLevelEnabled=false
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8c883045e66..6683d36c36e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -372,6 +372,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(category = CATEGORY_SERVER, doc = "Enable share the delayed 
message index across subscriptions")
     private boolean delayedDeliverySharedIndexEnabled = false;
 
+    @FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead 
window to use "
+            + "when detecting if all the messages in the topic have a fixed 
delay. "
+            + "Default is 50,000. Setting the lookahead window to 0 will 
disable the "
+            + "logic to handle fixed delays in messages in a different way.")
+    private long delayedDeliveryFixedDelayDetectionLookahead = 50_000;
+
     @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the 
acknowledge of batch local index")
     private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 83b113df36b..11d663322be 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -59,7 +59,7 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
     // always going to be in FIFO order, then we can avoid pulling all the 
messages in
     // tracker. Instead, we use the lookahead for detection and pause the read 
from
     // the cursor if the delays are fixed.
-    public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50_000;
+    private final long fixedDelayDetectionLookahead;
 
     // This is the timestamp of the message with the highest delivery time
     // If new added messages are lower than this, it means the delivery is 
requested
@@ -70,17 +70,22 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
     private boolean messagesHaveFixedDelay = true;
 
     InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher, Timer timer, long tickTimeMillis,
-                                   boolean 
isDelayedDeliveryDeliverAtTimeStrict) {
-        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict);
+                                   boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                   long fixedDelayDetectionLookahead) {
+        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
+                fixedDelayDetectionLookahead);
     }
 
     InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
-                                   long tickTimeMillis, Clock clock, boolean 
isDelayedDeliveryDeliverAtTimeStrict) {
+                                   long tickTimeMillis, Clock clock,
+                                   boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                   long fixedDelayDetectionLookahead) {
         this.dispatcher = dispatcher;
         this.timer = timer;
         this.tickTimeMillis = tickTimeMillis;
         this.clock = clock;
         this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
     }
 
     /**
@@ -283,8 +288,9 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
     @Override
     public boolean shouldPauseAllDeliveries() {
         // Pause deliveries if we know all delays are fixed within the 
lookahead window
-        return messagesHaveFixedDelay
-                && priorityQueue.size() >= 
DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
+        return fixedDelayDetectionLookahead > 0
+                && messagesHaveFixedDelay
+                && priorityQueue.size() >= fixedDelayDetectionLookahead
                 && !hasMessageAvailable();
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
index 5c04a6d53b2..7bf0ca87c40 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
@@ -33,18 +33,21 @@ public class InMemoryDelayedDeliveryTrackerFactory 
implements DelayedDeliveryTra
 
     private boolean isDelayedDeliveryDeliverAtTimeStrict;
 
+    private long fixedDelayDetectionLookahead;
+
     @Override
     public void initialize(ServiceConfiguration config) {
         this.timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-delayed-delivery"),
                 config.getDelayedDeliveryTickTimeMillis(), 
TimeUnit.MILLISECONDS);
         this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
         this.isDelayedDeliveryDeliverAtTimeStrict = 
config.isDelayedDeliveryDeliverAtTimeStrict();
+        this.fixedDelayDetectionLookahead = 
config.getDelayedDeliveryFixedDelayDetectionLookahead();
     }
 
     @Override
     public DelayedDeliveryTracker 
newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
         return new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
tickTimeMillis,
-                isDelayedDeliveryDeliverAtTimeStrict);
+                isDelayedDeliveryDeliverAtTimeStrict, 
fixedDelayDetectionLookahead);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index db2db6cc1db..1ff47a4ca50 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -74,7 +74,7 @@ public class InMemoryDeliveryTrackerTest {
 
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                false);
+                false, 0);
 
         assertFalse(tracker.hasMessageAvailable());
 
@@ -146,7 +146,7 @@ public class InMemoryDeliveryTrackerTest {
 
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                false);
+                false, 0);
 
         assertTrue(tasks.isEmpty());
         assertTrue(tracker.addMessage(2, 2, 20));
@@ -187,7 +187,7 @@ public class InMemoryDeliveryTrackerTest {
 
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
-                false);
+                false, 0);
 
         clockTime.set(0);
 
@@ -209,7 +209,7 @@ public class InMemoryDeliveryTrackerTest {
 
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
-                true);
+                true, 0);
 
         clockTime.set(10);
 
@@ -236,7 +236,7 @@ public class InMemoryDeliveryTrackerTest {
         // Use a short tick time to show that the timer task is run based on 
the deliverAt time in this scenario.
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer,
-                1000, clock, true);
+                1000, clock, true, 0);
 
         // Set clock time, then run tracker to inherit clock time as the last 
tick time.
         clockTime.set(10000);
@@ -274,7 +274,7 @@ public class InMemoryDeliveryTrackerTest {
         // a previous tick run.
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer,
-                100000, clock, true);
+                100000, clock, true, 0);
 
         clockTime.set(500000);
 
@@ -299,7 +299,7 @@ public class InMemoryDeliveryTrackerTest {
         // Use a short tick time to show that the timer task is run based on 
the deliverAt time in this scenario.
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer,
-                500, clock, true);
+                500, clock, true, 0);
 
         clockTime.set(0);
 
@@ -323,9 +323,11 @@ public class InMemoryDeliveryTrackerTest {
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
+        final long fixedDelayLookahead = 100;
+
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                true);
+                true, fixedDelayLookahead);
 
         assertFalse(tracker.hasMessageAvailable());
 
@@ -339,13 +341,13 @@ public class InMemoryDeliveryTrackerTest {
         assertEquals(tracker.getNumberOfDelayedMessages(), 5);
         assertFalse(tracker.shouldPauseAllDeliveries());
 
-        for (int i = 6; i <= 
InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+        for (int i = 6; i <= fixedDelayLookahead; i++) {
             assertTrue(tracker.addMessage(i, i, i * 10));
         }
 
         assertTrue(tracker.shouldPauseAllDeliveries());
 
-        
clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
 * 10);
+        clockTime.set(fixedDelayLookahead * 10);
 
         tracker.getScheduledMessages(100);
         assertFalse(tracker.shouldPauseAllDeliveries());
@@ -367,9 +369,11 @@ public class InMemoryDeliveryTrackerTest {
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
+        long fixedDelayLookahead = 100;
+
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                true);
+                true, fixedDelayLookahead);
 
         assertFalse(tracker.hasMessageAvailable());
 
@@ -381,7 +385,7 @@ public class InMemoryDeliveryTrackerTest {
 
         assertFalse(tracker.shouldPauseAllDeliveries());
 
-        for (int i = 6; i <= 
InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+        for (int i = 6; i <= fixedDelayLookahead; i++) {
             assertTrue(tracker.addMessage(i, i, i * 10));
         }
 
@@ -401,9 +405,11 @@ public class InMemoryDeliveryTrackerTest {
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
+        long fixedDelayLookahead = 100;
+
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                true);
+                true, fixedDelayLookahead);
 
         assertFalse(tracker.hasMessageAvailable());
 
@@ -415,7 +421,7 @@ public class InMemoryDeliveryTrackerTest {
 
         assertFalse(tracker.shouldPauseAllDeliveries());
 
-        for (int i = 6; i <= 
InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+        for (int i = 6; i <= fixedDelayLookahead; i++) {
             assertTrue(tracker.addMessage(i, i, i * 10));
         }
 

Reply via email to