This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5bc399de8cad0a97673cd9415bd54c679f461221 Author: Michael Marshall <mmarsh...@apache.org> AuthorDate: Thu Jun 16 00:40:24 2022 -0500 [broker] Add config to allow deliverAt time to be strictly honored (#16068) * [broker] Add config to allow deliverAt time to be strictly honored * Fix checkstyle error (this is what happens why you change names last minute) * Improve documentation; add private final modifiers The current implementation for `InMemoryDelayedDeliveryTracker` allows messages to deliver early when their `deliverAt` time is within `tickTimeMillis` from now. This is an optimization that ensures messages deliver around the `deliverAt` time. However, some use cases require that messages do not deliver before the `deliverAt` time. (Note that the client api includes a `deliverAfter` method that implies messages won't deliver before some duration of time.) In order to support this alternative implementation, this PR adds a broker configuration named `isDelayedDeliveryDeliverAtTimeStrict`. When true, messages will only deliver when the `deliverAt` time is greater than or equal to `now`. Note that a tradeoff here is that messages will be later than the `deliverAt` time. There are two factors that will determine how late messages will get to consumers. The first is the topic's `DelayedDeliveryTickTimeMillis` and the second is the broker's `delayedDeliveryTickTimeMillis`. The first will determine how frequently a timer will be scheduled to deliver delayed messages. The second is used to determine the tick time of the `HashedWheelTimer`, and as a result, can compound with the topic's delay to make a message deliver even later. * Add broker config named `isDelayedDeliveryDeliverAtTimeStrict`. This config defaults to `false` to maintain the original behavior. * Update the `InMemoryDelayedDeliveryTracker#addMessage` method so that it will return false when `deliverAt <= getCutoffTime()` instead of just `deliverAt <= getCutoffTime()`. * Update documentation in several places. * Implement `InMemoryDelayedDeliveryTracker#getCutoffTime` method that returns the right cutoff time based on the value of `isDelayedDeliveryDeliverAtTimeStrict`. This is the core logical change. * Update `InMemoryDelayedDeliveryTracker#updateTimer` so that it will not schedule a tick to run sooner that the most recent tick run plus the `tickTimeMillis`. This will ensure the timer is not run too frequently. It is also backwards compatible since the existing feature will deliver any messages that were within now plus the `tickTimeMillis`. * Add new tests to cover the new configuration. New tests are added as part of this change. This is a new feature that maintains backwards compatibility. --- conf/broker.conf | 10 ++ .../apache/pulsar/broker/ServiceConfiguration.java | 13 +- .../delayed/InMemoryDelayedDeliveryTracker.java | 83 ++++++++---- .../InMemoryDelayedDeliveryTrackerFactory.java | 6 +- .../delayed/InMemoryDeliveryTrackerTest.java | 149 +++++++++++++++++++-- site2/docs/concepts-messaging.md | 2 + 6 files changed, 224 insertions(+), 39 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index db8b618fff8..3c4e4b4524f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -487,9 +487,19 @@ delayedDeliveryEnabled=true # Control the tick time for when retrying on delayed delivery, # affecting the accuracy of the delivery time compared to the scheduled time. +# Note that this time is used to configure the HashedWheelTimer's tick time for the +# InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory). # Default is 1 second. delayedDeliveryTickTimeMillis=1000 +# When using the InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory), whether +# the deliverAt time is strictly followed. When false (default), messages may be sent to consumers before the deliverAt +# time by as much as the tickTimeMillis. This can reduce the overhead on the broker of maintaining the delayed index +# for a potentially very short time period. When true, messages will not be sent to consumer until the deliverAt time +# has passed, and they may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the +# delayedDeliveryTickTimeMillis. +isDelayedDeliveryDeliverAtTimeStrict=false + # Whether to enable acknowledge of batch local index. acknowledgmentAtBatchIndexLevelEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index bbdbc31b3f1..47e47a71d3a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -274,9 +274,20 @@ public class ServiceConfiguration implements PulsarConfiguration { private String delayedDeliveryTrackerFactoryClassName = "org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory"; @FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for when retrying on delayed delivery, " - + " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.") + + "affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second. " + + "Note that this time is used to configure the HashedWheelTimer's tick time for the " + + "InMemoryDelayedDeliveryTrackerFactory.") private long delayedDeliveryTickTimeMillis = 1000; + @FieldContext(category = CATEGORY_SERVER, doc = "When using the InMemoryDelayedDeliveryTrackerFactory (the default " + + "DelayedDeliverTrackerFactory), whether the deliverAt time is strictly followed. When false (default), " + + "messages may be sent to consumers before the deliverAt time by as much as the tickTimeMillis. This can " + + "reduce the overhead on the broker of maintaining the delayed index for a potentially very short time " + + "period. When true, messages will not be sent to consumer until the deliverAt time has passed, and they " + + "may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the " + + "delayedDeliveryTickTimeMillis.") + private boolean isDelayedDeliveryDeliverAtTimeStrict = false; + @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index") private boolean acknowledgmentAtBatchIndexLevelEnabled = false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 80ec2185e56..540c9976a48 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -46,39 +46,55 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T // Timestamp at which the timeout is currently set private long currentTimeoutTarget; + // Last time the TimerTask was triggered for this class + private long lastTickRun; + private long tickTimeMillis; private final Clock clock; - InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis) { - this(dispatcher, timer, tickTimeMillis, Clock.systemUTC()); + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + + InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict) { + this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict); } InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, - long tickTimeMillis, Clock clock) { + long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict) { this.dispatcher = dispatcher; this.timer = timer; this.tickTimeMillis = tickTimeMillis; this.clock = clock; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + } + + /** + * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow for early delivery by as much as the + * {@link #tickTimeMillis} because it is a slight optimization to let messages skip going back into the delay + * tracker for a brief amount of time when we're already trying to dispatch to the consumer. + * + * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the current time to determine when messages + * can be delivered. As a consequence, there are two delays that will affect delivery. The first is the + * {@link #tickTimeMillis} and the second is the {@link Timer}'s granularity. + * + * @return the cutoff time to determine whether a message is ready to deliver to the consumer + */ + private long getCutoffTime() { + return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : clock.millis() + tickTimeMillis; } @Override - public boolean addMessage(long ledgerId, long entryId, long deliveryAt) { - long now = clock.millis(); + public boolean addMessage(long ledgerId, long entryId, long deliverAt) { if (log.isDebugEnabled()) { log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId, - deliveryAt - now); + deliverAt - clock.millis()); } - if (deliveryAt < (now + tickTimeMillis)) { - // It's already about time to deliver this message. We add the buffer of - // `tickTimeMillis` because messages can be extracted from the tracker - // slightly before the expiration time. We don't want the messages to - // go back into the delay tracker (for a brief amount of time) when we're - // trying to dispatch to the consumer. + if (deliverAt <= getCutoffTime()) { return false; } - priorityQueue.add(deliveryAt, ledgerId, entryId); + priorityQueue.add(deliverAt, ledgerId, entryId); updateTimer(); return true; } @@ -88,11 +104,8 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T */ @Override public boolean hasMessageAvailable() { - // Avoid the TimerTask run before reach the timeout. - long cutOffTime = clock.millis() + tickTimeMillis; - boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= cutOffTime; + boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= getCutoffTime(); if (!hasMessageAvailable) { - // prevent the first delay message later than cutoffTime updateTimer(); } return hasMessageAvailable; @@ -105,11 +118,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T public Set<PositionImpl> getScheduledMessages(int maxMessages) { int n = maxMessages; Set<PositionImpl> positions = new TreeSet<>(); - long now = clock.millis(); - // Pick all the messages that will be ready within the tick time period. - // This is to avoid keeping rescheduling the timer for each message at - // very short delay - long cutoffTime = now + tickTimeMillis; + long cutoffTime = getCutoffTime(); while (n > 0 && !priorityQueue.isEmpty()) { long timestamp = priorityQueue.peekN1(); @@ -149,6 +158,17 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T return priorityQueue.size(); } + /** + * Update the scheduled timer task such that: + * 1. If there are no delayed messages, return and do not schedule a timer task. + * 2. If the next message in the queue has the same deliverAt time as the timer task, return and leave existing + * timer task in place. + * 3. If the deliverAt time for the next delayed message has already passed (i.e. the delay is negative), return + * without scheduling a timer task since the subscription is backlogged. + * 4. Else, schedule a timer task where the delay is the greater of these two: the next message's deliverAt time or + * the last tick time plus the tickTimeMillis (to ensure we do not schedule the task more frequently than the + * tickTimeMillis). + */ private void updateTimer() { if (priorityQueue.isEmpty()) { if (timeout != null) { @@ -169,10 +189,8 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T timeout.cancel(); } - long delayMillis = timestamp - clock.millis(); - if (log.isDebugEnabled()) { - log.debug("[{}] Start timer in {} millis", dispatcher.getName(), delayMillis); - } + long now = clock.millis(); + long delayMillis = timestamp - now; if (delayMillis < 0) { // There are messages that are already ready to be delivered. If @@ -184,8 +202,18 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T return; } + // Compute the earliest time that we schedule the timer to run. + long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now; + long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis); + + if (log.isDebugEnabled()) { + log.debug("[{}] Start timer in {} millis", dispatcher.getName(), calculatedDelayMillis); + } + + // Even though we may delay longer than this timestamp because of the tick delay, we still track the + // current timeout with reference to the next message's timestamp. currentTimeoutTarget = timestamp; - timeout = timer.newTimeout(this, delayMillis, TimeUnit.MILLISECONDS); + timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS); } @Override @@ -198,6 +226,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T } synchronized (dispatcher) { + lastTickRun = clock.millis(); currentTimeoutTarget = -1; timeout = null; dispatcher.readMoreEntries(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java index b1a9b263369..5c04a6d53b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java @@ -31,16 +31,20 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra private long tickTimeMillis; + private boolean isDelayedDeliveryDeliverAtTimeStrict; + @Override public void initialize(ServiceConfiguration config) { this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"), config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS); this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis(); + this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict(); } @Override public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) { - return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis); + return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, + isDelayedDeliveryDeliverAtTimeStrict); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index d7b304d8a0c..f44f61a67f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -21,13 +21,16 @@ package org.apache.pulsar.broker.delayed; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; @@ -40,27 +43,38 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import io.netty.util.concurrent.DefaultThreadFactory; import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; import org.testng.annotations.Test; @Test(groups = "broker") public class InMemoryDeliveryTrackerTest { + // Create a single shared timer for the test. + private final Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), + 500, TimeUnit.MILLISECONDS); + + @AfterClass(alwaysRun = true) + public void cleanup() { + timer.stop(); + } + @Test public void test() throws Exception { PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - Timer timer = mock(Timer.class); - AtomicLong clockTime = new AtomicLong(); Clock clock = mock(Clock.class); when(clock.millis()).then(x -> clockTime.get()); @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock); + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, + false); assertFalse(tracker.hasMessageAvailable()); @@ -131,7 +145,8 @@ public class InMemoryDeliveryTrackerTest { }); @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock); + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, + false); assertTrue(tasks.isEmpty()); assertTrue(tracker.addMessage(2, 2, 20)); @@ -160,29 +175,143 @@ public class InMemoryDeliveryTrackerTest { /** * Adding a message that is about to expire within the tick time should lead - * to a rejection from the tracker. + * to a rejection from the tracker when isDelayedDeliveryDeliverAtTimeStrict is false. */ @Test public void testAddWithinTickTime() { PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - Timer timer = mock(Timer.class); - AtomicLong clockTime = new AtomicLong(); Clock clock = mock(Clock.class); when(clock.millis()).then(x -> clockTime.get()); @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock); + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock, + false); clockTime.set(0); assertFalse(tracker.addMessage(1, 1, 10)); assertFalse(tracker.addMessage(2, 2, 99)); - assertTrue(tracker.addMessage(3, 3, 100)); - assertTrue(tracker.addMessage(4, 4, 200)); + assertFalse(tracker.addMessage(3, 3, 100)); + assertTrue(tracker.addMessage(4, 4, 101)); + assertTrue(tracker.addMessage(5, 5, 200)); assertEquals(tracker.getNumberOfDelayedMessages(), 2); } + public void testAddMessageWithStrictDelay() { + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + @Cleanup + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock, + true); + + clockTime.set(10); + + // Verify behavior for the less than, equal to, and greater than deliverAt times. + assertFalse(tracker.addMessage(1, 1, 9)); + assertFalse(tracker.addMessage(4, 4, 10)); + assertTrue(tracker.addMessage(1, 1, 11)); + + assertEquals(tracker.getNumberOfDelayedMessages(), 1); + assertFalse(tracker.hasMessageAvailable()); + } + + /** + * In this test, the deliverAt time is after now, but the deliverAt time is too early to run another tick, so the + * tickTimeMillis determines the delay. + */ + public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict() throws Exception { + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario. + @Cleanup + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, + 1000, clock, true); + + // Set clock time, then run tracker to inherit clock time as the last tick time. + clockTime.set(10000); + Timeout timeout = mock(Timeout.class); + when(timeout.isCancelled()).then(x -> false); + tracker.run(timeout); + verify(dispatcher, times(1)).readMoreEntries(); + + // Add a message that has a delivery time just after the previous run. It will get delivered based on the + // tick delay plus the last tick run. + assertTrue(tracker.addMessage(1, 1, 10001)); + + // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has + // passed where it would have been triggered if the tick time was doing the triggering. + Thread.sleep(600); + verify(dispatcher, times(1)).readMoreEntries(); + + // Not wait for the message delivery to get triggered. + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> verify(dispatcher).readMoreEntries()); + } + + /** + * In this test, the deliverAt time is after now, but before the (tickTimeMillis + now). Because there wasn't a + * recent tick run, the deliverAt time determines the delay. + */ + public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict() { + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + // Use a large tick time to show that the message will get delivered earlier because there wasn't + // a previous tick run. + @Cleanup + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, + 100000, clock, true); + + clockTime.set(500000); + + assertTrue(tracker.addMessage(1, 1, 500005)); + + // Wait long enough for the runnable to run, but not longer than the tick time. The point is that the delivery + // should get scheduled early when the tick duration has passed since the last tick. + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> verify(dispatcher).readMoreEntries()); + } + + /** + * In this test, the deliverAt time is after now plus tickTimeMillis, so the tickTimeMillis determines the delay. + */ + public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws Exception { + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario. + @Cleanup + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, + 500, clock, true); + + clockTime.set(0); + + assertTrue(tracker.addMessage(1, 1, 2000)); + + // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has + // passed where it would have been triggered if the tick time was doing the triggering. + Thread.sleep(1000); + verifyNoInteractions(dispatcher); + + // Not wait for the message delivery to get triggered. + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> verify(dispatcher).readMoreEntries()); + } } diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md index 3e488b1acfb..abb68624f83 100644 --- a/site2/docs/concepts-messaging.md +++ b/site2/docs/concepts-messaging.md @@ -596,6 +596,8 @@ delayedDeliveryEnabled=true # Control the ticking time for the retry of delayed message delivery, # affecting the accuracy of the delivery time compared to the scheduled time. +# Note that this time is used to configure the HashedWheelTimer's tick time for the +# InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory). # Default is 1 second. delayedDeliveryTickTimeMillis=1000 ```