This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 2b012e736a0 [fix][broker] Correct two race conditions in the tracker 
code and logic bug in InMemoryDelayedDeliveryTracker that failed with 
NoSuchElementException (#25681)
2b012e736a0 is described below

commit 2b012e736a0e483754d87e07886158d1f05b4081
Author: Chris Hamons <[email protected]>
AuthorDate: Fri May 8 15:41:23 2026 -0500

    [fix][broker] Correct two race conditions in the tracker code and logic bug 
in InMemoryDelayedDeliveryTracker that failed with NoSuchElementException 
(#25681)
    
    (cherry picked from commit 6cbf4d2ad0d1514bfd2885015084102c2896ea67)
---
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  14 ++-
 .../PersistentDispatcherMultipleConsumers.java     |  10 +-
 ...rsistentDispatcherMultipleConsumersClassic.java |  10 +-
 .../delayed/InMemoryDeliveryTrackerTest.java       |  10 ++
 ...tentDispatcherMultipleConsumersClassicTest.java | 109 +++++++++++++++++++++
 .../PersistentDispatcherMultipleConsumersTest.java | 109 +++++++++++++++++++++
 6 files changed, 248 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index ad5ab25fbbf..1ded2cc428d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -126,10 +126,16 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
         }
 
         long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
-        delayedMessageMap.computeIfAbsent(timestamp, k -> new 
Long2ObjectRBTreeMap<>())
-                .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
-                .add(entryId);
-        delayedMessagesCount.incrementAndGet();
+        Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, 
k -> new Long2ObjectRBTreeMap<>())
+            .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap());
+        // Roaring64Bitmap does not store duplicates, so track if it a new 
element
+        // so we can keep delayedMessagesCount in sync
+        boolean isNew = !bitmap.contains(entryId);
+
+        if (isNew) {
+            bitmap.add(entryId);
+            delayedMessagesCount.incrementAndGet();
+        }
 
         updateTimer();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4f33e3e379b..1d169ecee3f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -441,7 +441,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractPersistentDis
         }
     }
 
-    protected Predicate<Position> 
createReadEntriesSkipConditionForNormalRead() {
+    protected synchronized Predicate<Position> 
createReadEntriesSkipConditionForNormalRead() {
         Predicate<Position> skipCondition = null;
         // Filter out and skip read delayed messages exist in 
DelayedDeliveryTracker
         if (delayedDeliveryTracker.isPresent()) {
@@ -1367,12 +1367,12 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractPersistentDis
     }
 
     @Override
-    public long getNumberOfDelayedMessages() {
+    public synchronized long getNumberOfDelayedMessages() {
         return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
     }
 
     @Override
-    public CompletableFuture<Void> clearDelayedMessages() {
+    public synchronized CompletableFuture<Void> clearDelayedMessages() {
         if (!topic.isDelayedDeliveryEnabled()) {
             return CompletableFuture.completedFuture(null);
         }
@@ -1451,11 +1451,11 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractPersistentDis
     }
 
 
-    public long getDelayedTrackerMemoryUsage() {
+    public synchronized long getDelayedTrackerMemoryUsage() {
         return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
     }
 
-    public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
+    public synchronized Map<String, TopicMetricBean> 
getBucketDelayedIndexStats() {
         if (delayedDeliveryTracker.isEmpty()) {
             return Collections.emptyMap();
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
index 0746b7215b1..e87b789d8b9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
@@ -1137,7 +1137,7 @@ public class PersistentDispatcherMultipleConsumersClassic 
extends AbstractPersis
     }
 
     @Override
-    public boolean trackDelayedDelivery(long ledgerId, long entryId, 
MessageMetadata msgMetadata) {
+    public synchronized boolean trackDelayedDelivery(long ledgerId, long 
entryId, MessageMetadata msgMetadata) {
         if (!topic.isDelayedDeliveryEnabled()) {
             // If broker has the feature disabled, always deliver messages 
immediately
             return false;
@@ -1203,12 +1203,12 @@ public class 
PersistentDispatcherMultipleConsumersClassic extends AbstractPersis
     }
 
     @Override
-    public long getNumberOfDelayedMessages() {
+    public synchronized long getNumberOfDelayedMessages() {
         return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
     }
 
     @Override
-    public CompletableFuture<Void> clearDelayedMessages() {
+    public synchronized CompletableFuture<Void> clearDelayedMessages() {
         if (!topic.isDelayedDeliveryEnabled()) {
             return CompletableFuture.completedFuture(null);
         }
@@ -1282,11 +1282,11 @@ public class 
PersistentDispatcherMultipleConsumersClassic extends AbstractPersis
     }
 
 
-    public long getDelayedTrackerMemoryUsage() {
+    public synchronized long getDelayedTrackerMemoryUsage() {
         return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
     }
 
-    public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
+    public synchronized Map<String, TopicMetricBean> 
getBucketDelayedIndexStats() {
         if (delayedDeliveryTracker.isEmpty()) {
             return Collections.emptyMap();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index c5a564d1b66..2affb7fef22 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -274,4 +274,14 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
         tracker.close();
     }
 
+    @Test(dataProvider = "delayedTracker")
+    public void 
testAddMultipleMessagesSameWindow(InMemoryDelayedDeliveryTracker tracker) 
throws Exception {
+        tracker.addMessage(1, 1, 50);
+        tracker.addMessage(1, 1, 50);
+        tracker.addMessage(1, 1, 50);
+
+        clockTime.set(60);
+
+        tracker.getScheduledMessages(10);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
index 835747bda1e..bc027887082 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
@@ -20,7 +20,14 @@ package org.apache.pulsar.broker.service.persistent;
 
 import com.carrotsearch.hppc.ObjectSet;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -29,12 +36,14 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -172,4 +181,104 @@ public class 
PersistentDispatcherMultipleConsumersClassicTest extends ProducerCo
         // Verify: the topic can be deleted successfully.
         admin.topics().delete(topicName, false);
     }
+
+    @Test
+    public void testRaceConditionInTrackDelayedDelivery() throws Exception {
+        final int numThreads = 16;
+        final int operationsPerThread = 2000;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+
+        final String topicName = newTopicName();
+        final String subscription = "s1";
+
+        // Needed to create the topic
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName).subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) getTopic(topicName, 
false).join().get();
+
+        ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
+        Mockito.doReturn(subscription).when(cursor).getName();
+
+        Subscription sub = Mockito.mock(PersistentSubscription.class);
+        Mockito.doReturn(topic).when(sub).getTopic();
+
+        PersistentDispatcherMultipleConsumersClassic dispatcher =
+            new PersistentDispatcherMultipleConsumersClassic(topic, cursor, 
sub);
+
+        // Align all writes to the same bucket
+        // This is the key which triggers the race condition
+        long deliverAt = System.currentTimeMillis() + 5000;
+
+        MessageMetadata messageMetadata = new MessageMetadata()
+            .setSequenceId(1)
+            .setProducerName("testProducer")
+            .setPartitionKeyB64Encoded(false)
+            .setPublishTime(System.currentTimeMillis())
+            .setDeliverAtTime(deliverAt);
+
+        @Cleanup("shutdown")
+        ExecutorService executorService = Executors.newFixedThreadPool(32);
+
+        // Start clear message thread
+        for (int i = 0; i < numThreads / 2; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < operationsPerThread; j++) {
+                        dispatcher.clearDelayedMessages();
+                        Thread.sleep(1);
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        // Start track delayed delivery thread
+        for (int i = numThreads / 2; i < numThreads; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < operationsPerThread; j++) {
+                        dispatcher.trackDelayedDelivery(1, 1, messageMetadata);
+                        Thread.sleep(1);
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        startLatch.countDown();
+        Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should 
complete within 30 seconds");
+
+        if (errors.get() > 0) {
+            Exception exception = firstException.get();
+            if (exception != null) {
+                System.err.println("First exception caught: " + 
exception.getMessage());
+                exception.printStackTrace();
+            }
+        }
+        Assert.assertEquals(errors.get(), 0, "No exceptions should occur 
during concurrent operations");
+    }
+
+    /**
+     * Looks up a topic by name, optionally creating it if it doesn't exist.
+     */
+    protected CompletableFuture<Optional<Topic>> getTopic(String topic, 
boolean createIfMissing) {
+        return pulsar.getBrokerService().getTopic(topic, createIfMissing);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
index f7293497efd..fdb9fa9109a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
@@ -20,7 +20,14 @@ package org.apache.pulsar.broker.service.persistent;
 
 import com.carrotsearch.hppc.ObjectSet;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -29,12 +36,14 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -172,4 +181,104 @@ public class PersistentDispatcherMultipleConsumersTest 
extends ProducerConsumerB
         // Verify: the topic can be deleted successfully.
         admin.topics().delete(topicName, false);
     }
+
+    @Test
+    public void testRaceConditionInTrackDelayedDelivery() throws Exception {
+        final int numThreads = 16;
+        final int operationsPerThread = 2000;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+
+        final String topicName = newTopicName();
+        final String subscription = "s1";
+
+        // Needed to create the topic
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName).subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) getTopic(topicName, 
false).join().get();
+
+        ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
+        Mockito.doReturn(subscription).when(cursor).getName();
+
+        Subscription sub = Mockito.mock(PersistentSubscription.class);
+        Mockito.doReturn(topic).when(sub).getTopic();
+
+        PersistentDispatcherMultipleConsumers dispatcher =
+            new PersistentDispatcherMultipleConsumers(topic, cursor, sub);
+
+        // Align all writes to the same bucket
+        // This is the key which triggers the race condition
+        long deliverAt = System.currentTimeMillis() + 5000;
+
+        MessageMetadata messageMetadata = new MessageMetadata()
+            .setSequenceId(1)
+            .setProducerName("testProducer")
+            .setPartitionKeyB64Encoded(false)
+            .setPublishTime(System.currentTimeMillis())
+            .setDeliverAtTime(deliverAt);
+
+        @Cleanup("shutdown")
+        ExecutorService executorService = Executors.newFixedThreadPool(32);
+
+        // Start clear message thread
+        for (int i = 0; i < numThreads / 2; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < operationsPerThread; j++) {
+                        dispatcher.clearDelayedMessages();
+                        Thread.sleep(1);
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        // Start track delayed delivery thread
+        for (int i = numThreads / 2; i < numThreads; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < operationsPerThread; j++) {
+                        dispatcher.trackDelayedDelivery(1, 1, messageMetadata);
+                        Thread.sleep(1);
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        startLatch.countDown();
+        Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should 
complete within 30 seconds");
+
+        if (errors.get() > 0) {
+            Exception exception = firstException.get();
+            if (exception != null) {
+                System.err.println("First exception caught: " + 
exception.getMessage());
+                exception.printStackTrace();
+            }
+        }
+        Assert.assertEquals(errors.get(), 0, "No exceptions should occur 
during concurrent operations");
+    }
+
+    /**
+     * Looks up a topic by name, optionally creating it if it doesn't exist.
+     */
+    protected CompletableFuture<Optional<Topic>> getTopic(String topic, 
boolean createIfMissing) {
+        return pulsar.getBrokerService().getTopic(topic, createIfMissing);
+    }
 }

Reply via email to