This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5bc399de8cad0a97673cd9415bd54c679f461221
Author: Michael Marshall <mmarsh...@apache.org>
AuthorDate: Thu Jun 16 00:40:24 2022 -0500

    [broker] Add config to allow deliverAt time to be strictly honored (#16068)
    
    * [broker] Add config to allow deliverAt time to be strictly honored
    
    * Fix checkstyle error (this is what happens why you change names last 
minute)
    
    * Improve documentation; add private final modifiers
    
    The current implementation for `InMemoryDelayedDeliveryTracker` allows 
messages to deliver early when their `deliverAt` time is within 
`tickTimeMillis` from now. This is an optimization that ensures messages 
deliver around the `deliverAt` time. However, some use cases require that 
messages do not deliver before the `deliverAt` time. (Note that the client api 
includes a `deliverAfter` method that implies messages won't deliver before 
some duration of time.)
    
    In order to support this alternative implementation, this PR adds a broker 
configuration named `isDelayedDeliveryDeliverAtTimeStrict`. When true, messages 
will only deliver when the `deliverAt` time is greater than or equal to `now`. 
Note that a tradeoff here is that messages will be later than the `deliverAt` 
time.
    
    There are two factors that will determine how late messages will get to 
consumers. The first is the topic's `DelayedDeliveryTickTimeMillis` and the 
second is the broker's `delayedDeliveryTickTimeMillis`. The first will 
determine how frequently a timer will be scheduled to deliver delayed messages. 
The second is used to determine the tick time of the `HashedWheelTimer`, and as 
a result, can compound with the topic's delay to make a message deliver even 
later.
    
    * Add broker config named `isDelayedDeliveryDeliverAtTimeStrict`. This 
config defaults to `false` to maintain the original behavior.
    * Update the `InMemoryDelayedDeliveryTracker#addMessage` method so that it 
will return false when `deliverAt <= getCutoffTime()` instead of just 
`deliverAt <= getCutoffTime()`.
    * Update documentation in several places.
    * Implement `InMemoryDelayedDeliveryTracker#getCutoffTime` method that 
returns the right cutoff time based on the value of 
`isDelayedDeliveryDeliverAtTimeStrict`. This is the core logical change.
    * Update `InMemoryDelayedDeliveryTracker#updateTimer` so that it will not 
schedule a tick to run sooner that the most recent tick run plus the 
`tickTimeMillis`. This will ensure the timer is not run too frequently. It is 
also backwards compatible since the existing feature will deliver any messages 
that were within now plus the `tickTimeMillis`.
    * Add new tests to cover the new configuration.
    
    New tests are added as part of this change.
    
    This is a new feature that maintains backwards compatibility.
---
 conf/broker.conf                                   |  10 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  13 +-
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  83 ++++++++----
 .../InMemoryDelayedDeliveryTrackerFactory.java     |   6 +-
 .../delayed/InMemoryDeliveryTrackerTest.java       | 149 +++++++++++++++++++--
 site2/docs/concepts-messaging.md                   |   2 +
 6 files changed, 224 insertions(+), 39 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index db8b618fff8..3c4e4b4524f 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -487,9 +487,19 @@ delayedDeliveryEnabled=true
 
 # Control the tick time for when retrying on delayed delivery,
 # affecting the accuracy of the delivery time compared to the scheduled time.
+# Note that this time is used to configure the HashedWheelTimer's tick time 
for the
+# InMemoryDelayedDeliveryTrackerFactory (the default 
DelayedDeliverTrackerFactory).
 # Default is 1 second.
 delayedDeliveryTickTimeMillis=1000
 
+# When using the InMemoryDelayedDeliveryTrackerFactory (the default 
DelayedDeliverTrackerFactory), whether
+# the deliverAt time is strictly followed. When false (default), messages may 
be sent to consumers before the deliverAt
+# time by as much as the tickTimeMillis. This can reduce the overhead on the 
broker of maintaining the delayed index
+# for a potentially very short time period. When true, messages will not be 
sent to consumer until the deliverAt time
+# has passed, and they may be as late as the deliverAt time plus the 
tickTimeMillis for the topic plus the
+# delayedDeliveryTickTimeMillis.
+isDelayedDeliveryDeliverAtTimeStrict=false
+
 # Whether to enable acknowledge of batch local index.
 acknowledgmentAtBatchIndexLevelEnabled=false
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index bbdbc31b3f1..47e47a71d3a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -274,9 +274,20 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private String delayedDeliveryTrackerFactoryClassName = 
"org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory";
 
     @FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for 
when retrying on delayed delivery, "
-            + " affecting the accuracy of the delivery time compared to the 
scheduled time. Default is 1 second.")
+            + "affecting the accuracy of the delivery time compared to the 
scheduled time. Default is 1 second. "
+            + "Note that this time is used to configure the HashedWheelTimer's 
tick time for the "
+            + "InMemoryDelayedDeliveryTrackerFactory.")
     private long delayedDeliveryTickTimeMillis = 1000;
 
+    @FieldContext(category = CATEGORY_SERVER, doc = "When using the 
InMemoryDelayedDeliveryTrackerFactory (the default "
+            + "DelayedDeliverTrackerFactory), whether the deliverAt time is 
strictly followed. When false (default), "
+            + "messages may be sent to consumers before the deliverAt time by 
as much as the tickTimeMillis. This can "
+            + "reduce the overhead on the broker of maintaining the delayed 
index for a potentially very short time "
+            + "period. When true, messages will not be sent to consumer until 
the deliverAt time has passed, and they "
+            + "may be as late as the deliverAt time plus the tickTimeMillis 
for the topic plus the "
+            + "delayedDeliveryTickTimeMillis.")
+    private boolean isDelayedDeliveryDeliverAtTimeStrict = false;
+
     @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the 
acknowledge of batch local index")
     private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 80ec2185e56..540c9976a48 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -46,39 +46,55 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
     // Timestamp at which the timeout is currently set
     private long currentTimeoutTarget;
 
+    // Last time the TimerTask was triggered for this class
+    private long lastTickRun;
+
     private long tickTimeMillis;
 
     private final Clock clock;
 
-    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher, Timer timer, long tickTimeMillis) {
-        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC());
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+
+    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher, Timer timer, long tickTimeMillis,
+                                   boolean 
isDelayedDeliveryDeliverAtTimeStrict) {
+        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict);
     }
 
     InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
-                                   long tickTimeMillis, Clock clock) {
+                                   long tickTimeMillis, Clock clock, boolean 
isDelayedDeliveryDeliverAtTimeStrict) {
         this.dispatcher = dispatcher;
         this.timer = timer;
         this.tickTimeMillis = tickTimeMillis;
         this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+    }
+
+    /**
+     * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow 
for early delivery by as much as the
+     * {@link #tickTimeMillis} because it is a slight optimization to let 
messages skip going back into the delay
+     * tracker for a brief amount of time when we're already trying to 
dispatch to the consumer.
+     *
+     * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the 
current time to determine when messages
+     * can be delivered. As a consequence, there are two delays that will 
affect delivery. The first is the
+     * {@link #tickTimeMillis} and the second is the {@link Timer}'s 
granularity.
+     *
+     * @return the cutoff time to determine whether a message is ready to 
deliver to the consumer
+     */
+    private long getCutoffTime() {
+        return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : 
clock.millis() + tickTimeMillis;
     }
 
     @Override
-    public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
-        long now = clock.millis();
+    public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", 
dispatcher.getName(), ledgerId, entryId,
-                    deliveryAt - now);
+                    deliverAt - clock.millis());
         }
-        if (deliveryAt < (now + tickTimeMillis)) {
-            // It's already about time to deliver this message. We add the 
buffer of
-            // `tickTimeMillis` because messages can be extracted from the 
tracker
-            // slightly before the expiration time. We don't want the messages 
to
-            // go back into the delay tracker (for a brief amount of time) 
when we're
-            // trying to dispatch to the consumer.
+        if (deliverAt <= getCutoffTime()) {
             return false;
         }
 
-        priorityQueue.add(deliveryAt, ledgerId, entryId);
+        priorityQueue.add(deliverAt, ledgerId, entryId);
         updateTimer();
         return true;
     }
@@ -88,11 +104,8 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
      */
     @Override
     public boolean hasMessageAvailable() {
-        // Avoid the TimerTask run before reach the timeout.
-        long cutOffTime = clock.millis() + tickTimeMillis;
-        boolean hasMessageAvailable = !priorityQueue.isEmpty() && 
priorityQueue.peekN1() <= cutOffTime;
+        boolean hasMessageAvailable = !priorityQueue.isEmpty() && 
priorityQueue.peekN1() <= getCutoffTime();
         if (!hasMessageAvailable) {
-            // prevent the first delay message later than cutoffTime
             updateTimer();
         }
         return hasMessageAvailable;
@@ -105,11 +118,7 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
     public Set<PositionImpl> getScheduledMessages(int maxMessages) {
         int n = maxMessages;
         Set<PositionImpl> positions = new TreeSet<>();
-        long now = clock.millis();
-        // Pick all the messages that will be ready within the tick time 
period.
-        // This is to avoid keeping rescheduling the timer for each message at
-        // very short delay
-        long cutoffTime = now + tickTimeMillis;
+        long cutoffTime = getCutoffTime();
 
         while (n > 0 && !priorityQueue.isEmpty()) {
             long timestamp = priorityQueue.peekN1();
@@ -149,6 +158,17 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
         return priorityQueue.size();
     }
 
+    /**
+     * Update the scheduled timer task such that:
+     * 1. If there are no delayed messages, return and do not schedule a timer 
task.
+     * 2. If the next message in the queue has the same deliverAt time as the 
timer task, return and leave existing
+     *    timer task in place.
+     * 3. If the deliverAt time for the next delayed message has already 
passed (i.e. the delay is negative), return
+     *    without scheduling a timer task since the subscription is backlogged.
+     * 4. Else, schedule a timer task where the delay is the greater of these 
two: the next message's deliverAt time or
+     *    the last tick time plus the tickTimeMillis (to ensure we do not 
schedule the task more frequently than the
+     *    tickTimeMillis).
+     */
     private void updateTimer() {
         if (priorityQueue.isEmpty()) {
             if (timeout != null) {
@@ -169,10 +189,8 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
             timeout.cancel();
         }
 
-        long delayMillis = timestamp - clock.millis();
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Start timer in {} millis", dispatcher.getName(), 
delayMillis);
-        }
+        long now = clock.millis();
+        long delayMillis = timestamp - now;
 
         if (delayMillis < 0) {
             // There are messages that are already ready to be delivered. If
@@ -184,8 +202,18 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
             return;
         }
 
+        // Compute the earliest time that we schedule the timer to run.
+        long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
+        long calculatedDelayMillis = Math.max(delayMillis, 
remainingTickDelayMillis);
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Start timer in {} millis", dispatcher.getName(), 
calculatedDelayMillis);
+        }
+
+        // Even though we may delay longer than this timestamp because of the 
tick delay, we still track the
+        // current timeout with reference to the next message's timestamp.
         currentTimeoutTarget = timestamp;
-        timeout = timer.newTimeout(this, delayMillis, TimeUnit.MILLISECONDS);
+        timeout = timer.newTimeout(this, calculatedDelayMillis, 
TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -198,6 +226,7 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
         }
 
         synchronized (dispatcher) {
+            lastTickRun = clock.millis();
             currentTimeoutTarget = -1;
             timeout = null;
             dispatcher.readMoreEntries();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
index b1a9b263369..5c04a6d53b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
@@ -31,16 +31,20 @@ public class InMemoryDelayedDeliveryTrackerFactory 
implements DelayedDeliveryTra
 
     private long tickTimeMillis;
 
+    private boolean isDelayedDeliveryDeliverAtTimeStrict;
+
     @Override
     public void initialize(ServiceConfiguration config) {
         this.timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-delayed-delivery"),
                 config.getDelayedDeliveryTickTimeMillis(), 
TimeUnit.MILLISECONDS);
         this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
config.isDelayedDeliveryDeliverAtTimeStrict();
     }
 
     @Override
     public DelayedDeliveryTracker 
newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
-        return new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
tickTimeMillis);
+        return new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
tickTimeMillis,
+                isDelayedDeliveryDeliverAtTimeStrict);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index d7b304d8a0c..f44f61a67f9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -21,13 +21,16 @@ package org.apache.pulsar.broker.delayed;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
+import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import io.netty.util.TimerTask;
@@ -40,27 +43,38 @@ import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.Cleanup;
 
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class InMemoryDeliveryTrackerTest {
 
+    // Create a single shared timer for the test.
+    private final Timer timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
+            500, TimeUnit.MILLISECONDS);
+
+    @AfterClass(alwaysRun = true)
+    public void cleanup() {
+        timer.stop();
+    }
+
     @Test
     public void test() throws Exception {
         PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
 
-        Timer timer = mock(Timer.class);
-
         AtomicLong clockTime = new AtomicLong();
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
         @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock);
+        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                false);
 
         assertFalse(tracker.hasMessageAvailable());
 
@@ -131,7 +145,8 @@ public class InMemoryDeliveryTrackerTest {
         });
 
         @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock);
+        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                false);
 
         assertTrue(tasks.isEmpty());
         assertTrue(tracker.addMessage(2, 2, 20));
@@ -160,29 +175,143 @@ public class InMemoryDeliveryTrackerTest {
 
     /**
      * Adding a message that is about to expire within the tick time should 
lead
-     * to a rejection from the tracker.
+     * to a rejection from the tracker when 
isDelayedDeliveryDeliverAtTimeStrict is false.
      */
     @Test
     public void testAddWithinTickTime() {
         PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
 
-        Timer timer = mock(Timer.class);
-
         AtomicLong clockTime = new AtomicLong();
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
         @Cleanup
-        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock);
+        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
+                false);
 
         clockTime.set(0);
 
         assertFalse(tracker.addMessage(1, 1, 10));
         assertFalse(tracker.addMessage(2, 2, 99));
-        assertTrue(tracker.addMessage(3, 3, 100));
-        assertTrue(tracker.addMessage(4, 4, 200));
+        assertFalse(tracker.addMessage(3, 3, 100));
+        assertTrue(tracker.addMessage(4, 4, 101));
+        assertTrue(tracker.addMessage(5, 5, 200));
 
         assertEquals(tracker.getNumberOfDelayedMessages(), 2);
     }
 
+    public void testAddMessageWithStrictDelay() {
+        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
+                true);
+
+        clockTime.set(10);
+
+        // Verify behavior for the less than, equal to, and greater than 
deliverAt times.
+        assertFalse(tracker.addMessage(1, 1, 9));
+        assertFalse(tracker.addMessage(4, 4, 10));
+        assertTrue(tracker.addMessage(1, 1, 11));
+
+        assertEquals(tracker.getNumberOfDelayedMessages(), 1);
+        assertFalse(tracker.hasMessageAvailable());
+    }
+
+    /**
+     * In this test, the deliverAt time is after now, but the deliverAt time 
is too early to run another tick, so the
+     * tickTimeMillis determines the delay.
+     */
+    public void 
testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict() 
throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        // Use a short tick time to show that the timer task is run based on 
the deliverAt time in this scenario.
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer,
+                1000, clock, true);
+
+        // Set clock time, then run tracker to inherit clock time as the last 
tick time.
+        clockTime.set(10000);
+        Timeout timeout = mock(Timeout.class);
+        when(timeout.isCancelled()).then(x -> false);
+        tracker.run(timeout);
+        verify(dispatcher, times(1)).readMoreEntries();
+
+        // Add a message that has a delivery time just after the previous run. 
It will get delivered based on the
+        // tick delay plus the last tick run.
+        assertTrue(tracker.addMessage(1, 1, 10001));
+
+        // Wait longer than the tick time plus the HashedWheelTimer's tick 
time to ensure that enough time has
+        // passed where it would have been triggered if the tick time was 
doing the triggering.
+        Thread.sleep(600);
+        verify(dispatcher, times(1)).readMoreEntries();
+
+        // Not wait for the message delivery to get triggered.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+    }
+
+    /**
+     * In this test, the deliverAt time is after now, but before the 
(tickTimeMillis + now). Because there wasn't a
+     * recent tick run, the deliverAt time determines the delay.
+     */
+    public void 
testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict() {
+        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        // Use a large tick time to show that the message will get delivered 
earlier because there wasn't
+        // a previous tick run.
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer,
+                100000, clock, true);
+
+        clockTime.set(500000);
+
+        assertTrue(tracker.addMessage(1, 1, 500005));
+
+        // Wait long enough for the runnable to run, but not longer than the 
tick time. The point is that the delivery
+        // should get scheduled early when the tick duration has passed since 
the last tick.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+    }
+
+    /**
+     * In this test, the deliverAt time is after now plus tickTimeMillis, so 
the tickTimeMillis determines the delay.
+     */
+    public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() 
throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        // Use a short tick time to show that the timer task is run based on 
the deliverAt time in this scenario.
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer,
+                500, clock, true);
+
+        clockTime.set(0);
+
+        assertTrue(tracker.addMessage(1, 1, 2000));
+
+        // Wait longer than the tick time plus the HashedWheelTimer's tick 
time to ensure that enough time has
+        // passed where it would have been triggered if the tick time was 
doing the triggering.
+        Thread.sleep(1000);
+        verifyNoInteractions(dispatcher);
+
+        // Not wait for the message delivery to get triggered.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> verify(dispatcher).readMoreEntries());
+    }
 }
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 3e488b1acfb..abb68624f83 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -596,6 +596,8 @@ delayedDeliveryEnabled=true
 
 # Control the ticking time for the retry of delayed message delivery,
 # affecting the accuracy of the delivery time compared to the scheduled time.
+# Note that this time is used to configure the HashedWheelTimer's tick time 
for the
+# InMemoryDelayedDeliveryTrackerFactory (the default 
DelayedDeliverTrackerFactory).
 # Default is 1 second.
 delayedDeliveryTickTimeMillis=1000
 ```

Reply via email to