This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c48a3243287 Avoid tracking the delays of all the message when we 
detect that they are fixed (#16609)
c48a3243287 is described below

commit c48a3243287c7d775459b6437d9f4b24ed44cf4c
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Fri Jul 15 09:01:41 2022 -0700

    Avoid tracking the delays of all the message when we detect that they are 
fixed (#16609)
    
    * Avoid tracking the delays of all the message when we detect that they are 
fixed
    
    * Use tick time to avoid clock skews across different producers
---
 .../broker/delayed/DelayedDeliveryTracker.java     |   5 +
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  47 ++++++++-
 .../broker/service/AbstractBaseDispatcher.java     |   3 +-
 .../PersistentDispatcherMultipleConsumers.java     |  17 +++-
 .../delayed/InMemoryDeliveryTrackerTest.java       | 113 +++++++++++++++++++++
 5 files changed, 179 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index 2fbd9a51d4a..35853d3599b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -55,6 +55,11 @@ public interface DelayedDeliveryTracker extends 
AutoCloseable {
      */
     Set<PositionImpl> getScheduledMessages(int maxMessages);
 
+    /**
+     * Tells whether the dispatcher should pause any message deliveries, until 
the DelayedDeliveryTracker has
+     * more messages available.
+     */
+    boolean shouldPauseAllDeliveries();
 
     /**
      *  Reset tick time use zk policies cache.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 92df563dad4..837d3d1872c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -55,6 +55,20 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
 
     private final boolean isDelayedDeliveryDeliverAtTimeStrict;
 
+    // If we detect that all messages have fixed delay time, such that the 
delivery is
+    // always going to be in FIFO order, then we can avoid pulling all the 
messages in
+    // tracker. Instead, we use the lookahead for detection and pause the read 
from
+    // the cursor if the delays are fixed.
+    public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50_000;
+
+    // This is the timestamp of the message with the highest delivery time
+    // If new added messages are lower than this, it means the delivery is 
requested
+    // to be out-of-order. It gets reset to 0, once the tracker is emptied.
+    private long highestDeliveryTimeTracked = 0;
+
+    // Track whether we have seen all messages with fixed delay so far.
+    private boolean messagesHaveFixedDelay = true;
+
     InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers 
dispatcher, Timer timer, long tickTimeMillis,
                                    boolean 
isDelayedDeliveryDeliverAtTimeStrict) {
         this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict);
@@ -86,16 +100,28 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
 
     @Override
     public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
+        if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
+            messagesHaveFixedDelay = false;
+            return false;
+        }
+
         if (log.isDebugEnabled()) {
             log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", 
dispatcher.getName(), ledgerId, entryId,
                     deliverAt - clock.millis());
         }
-        if (deliverAt <= getCutoffTime()) {
-            return false;
-        }
+
 
         priorityQueue.add(deliverAt, ledgerId, entryId);
         updateTimer();
+
+        // Check that new delivery time comes after the current highest, or at
+        // least within a single tick time interval of 1 second.
+        if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) {
+            messagesHaveFixedDelay = false;
+        }
+
+        highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, 
deliverAt);
+
         return true;
     }
 
@@ -137,6 +163,13 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
         if (log.isDebugEnabled()) {
             log.debug("[{}] Get scheduled messages - found {}", 
dispatcher.getName(), positions.size());
         }
+
+        if (priorityQueue.isEmpty()) {
+            // Reset to initial state
+            highestDeliveryTimeTracked = 0;
+            messagesHaveFixedDelay = true;
+        }
+
         updateTimer();
         return positions;
     }
@@ -241,4 +274,12 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
             timeout.cancel();
         }
     }
+
+    @Override
+    public boolean shouldPauseAllDeliveries() {
+        // Pause deliveries if we know all delays are fixed within the 
lookahead window
+        return messagesHaveFixedDelay
+                && priorityQueue.size() >= 
DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
+                && !hasMessageAvailable();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 0c7a6641216..d9f36bf0643 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -177,8 +177,7 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
                 entry.release();
                 individualAcknowledgeMessageIfNeeded(pos, 
Collections.emptyMap());
                 continue;
-            } else if (msgMetadata.hasDeliverAtTime()
-                    && trackDelayedDelivery(entry.getLedgerId(), 
entry.getEntryId(), msgMetadata)) {
+            } else if (trackDelayedDelivery(entry.getLedgerId(), 
entry.getEntryId(), msgMetadata)) {
                 // The message is marked for delayed delivery. Ignore for now.
                 entries.set(i, null);
                 entry.release();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6af58557a83..a770e76fc43 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -236,6 +236,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     }
 
     public synchronized void readMoreEntries() {
+        if (shouldPauseDeliveryForDelayTracker()) {
+            return;
+        }
+
         // totalAvailablePermits may be updated by other threads
         int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
         int currentTotalAvailablePermits = Math.max(totalAvailablePermits, 
firstAvailableConsumerPermits);
@@ -874,13 +878,20 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
 
         synchronized (this) {
             if (!delayedDeliveryTracker.isPresent()) {
+                if (!msgMetadata.hasDeliverAtTime()) {
+                    // No need to initialize the tracker here
+                    return false;
+                }
+
                 // Initialize the tracker the first time we need to use it
                 delayedDeliveryTracker = Optional
                         
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
             }
 
             
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
-            return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, 
msgMetadata.getDeliverAtTime());
+
+            long deliverAtTime = msgMetadata.hasDeliverAtTime() ? 
msgMetadata.getDeliverAtTime() : -1L;
+            return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, 
deliverAtTime);
         }
     }
 
@@ -895,6 +906,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         }
     }
 
+    protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
+        return delayedDeliveryTracker.isPresent() && 
delayedDeliveryTracker.get().shouldPauseAllDeliveries();
+    }
+
     @Override
     public synchronized long getNumberOfDelayedMessages() {
         return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index f44f61a67f9..db2db6cc1db 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -314,4 +314,117 @@ public class InMemoryDeliveryTrackerTest {
         Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> verify(dispatcher).readMoreEntries());
     }
+
+    @Test
+    public void testWithFixedDelays() throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                true);
+
+        assertFalse(tracker.hasMessageAvailable());
+
+        assertTrue(tracker.addMessage(1, 1, 10));
+        assertTrue(tracker.addMessage(2, 2, 20));
+        assertTrue(tracker.addMessage(3, 3, 30));
+        assertTrue(tracker.addMessage(4, 4, 40));
+        assertTrue(tracker.addMessage(5, 5, 50));
+
+        assertFalse(tracker.hasMessageAvailable());
+        assertEquals(tracker.getNumberOfDelayedMessages(), 5);
+        assertFalse(tracker.shouldPauseAllDeliveries());
+
+        for (int i = 6; i <= 
InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+            assertTrue(tracker.addMessage(i, i, i * 10));
+        }
+
+        assertTrue(tracker.shouldPauseAllDeliveries());
+
+        
clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
 * 10);
+
+        tracker.getScheduledMessages(100);
+        assertFalse(tracker.shouldPauseAllDeliveries());
+
+        // Empty the tracker
+        int removed = 0;
+        do {
+            removed = tracker.getScheduledMessages(100).size();
+        } while (removed > 0);
+
+        assertFalse(tracker.shouldPauseAllDeliveries());
+    }
+
+    @Test
+    public void testWithMixedDelays() throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                true);
+
+        assertFalse(tracker.hasMessageAvailable());
+
+        assertTrue(tracker.addMessage(1, 1, 10));
+        assertTrue(tracker.addMessage(2, 2, 20));
+        assertTrue(tracker.addMessage(3, 3, 30));
+        assertTrue(tracker.addMessage(4, 4, 40));
+        assertTrue(tracker.addMessage(5, 5, 50));
+
+        assertFalse(tracker.shouldPauseAllDeliveries());
+
+        for (int i = 6; i <= 
InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+            assertTrue(tracker.addMessage(i, i, i * 10));
+        }
+
+        assertTrue(tracker.shouldPauseAllDeliveries());
+
+        // Add message with earlier delivery time
+        assertTrue(tracker.addMessage(5, 5, 5));
+
+        assertFalse(tracker.shouldPauseAllDeliveries());
+    }
+
+    @Test
+    public void testWithNoDelays() throws Exception {
+        PersistentDispatcherMultipleConsumers dispatcher = 
mock(PersistentDispatcherMultipleConsumers.class);
+
+        AtomicLong clockTime = new AtomicLong();
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(x -> clockTime.get());
+
+        @Cleanup
+        InMemoryDelayedDeliveryTracker tracker = new 
InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
+                true);
+
+        assertFalse(tracker.hasMessageAvailable());
+
+        assertTrue(tracker.addMessage(1, 1, 10));
+        assertTrue(tracker.addMessage(2, 2, 20));
+        assertTrue(tracker.addMessage(3, 3, 30));
+        assertTrue(tracker.addMessage(4, 4, 40));
+        assertTrue(tracker.addMessage(5, 5, 50));
+
+        assertFalse(tracker.shouldPauseAllDeliveries());
+
+        for (int i = 6; i <= 
InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+            assertTrue(tracker.addMessage(i, i, i * 10));
+        }
+
+        assertTrue(tracker.shouldPauseAllDeliveries());
+
+        // Add message with no-delay
+        assertFalse(tracker.addMessage(5, 5, -1L));
+
+        assertFalse(tracker.shouldPauseAllDeliveries());
+    }
+
 }

Reply via email to