This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3327d8180cc [fix][broker] Correct two race conditions in the tracker
code and logic bug in InMemoryDelayedDeliveryTracker that failed with
NoSuchElementException (#25681)
3327d8180cc is described below
commit 3327d8180cccc5b1af222fbe53f3db42c10202c4
Author: Chris Hamons <[email protected]>
AuthorDate: Fri May 8 15:41:23 2026 -0500
[fix][broker] Correct two race conditions in the tracker code and logic bug
in InMemoryDelayedDeliveryTracker that failed with NoSuchElementException
(#25681)
(cherry picked from commit 6cbf4d2ad0d1514bfd2885015084102c2896ea67)
---
.../PersistentDispatcherMultipleConsumers.java | 10 +-
...rsistentDispatcherMultipleConsumersClassic.java | 10 +-
.../delayed/InMemoryDeliveryTrackerTest.java | 10 ++
...tentDispatcherMultipleConsumersClassicTest.java | 109 +++++++++++++++++++++
.../PersistentDispatcherMultipleConsumersTest.java | 109 +++++++++++++++++++++
5 files changed, 238 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b9a93e2c4fe..7d578fbe3be 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -442,7 +442,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractPersistentDis
}
}
- protected Predicate<Position>
createReadEntriesSkipConditionForNormalRead() {
+ protected synchronized Predicate<Position>
createReadEntriesSkipConditionForNormalRead() {
Predicate<Position> skipCondition = null;
// Filter out and skip read delayed messages exist in
DelayedDeliveryTracker
if (delayedDeliveryTracker.isPresent()) {
@@ -1367,12 +1367,12 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractPersistentDis
}
@Override
- public long getNumberOfDelayedMessages() {
+ public synchronized long getNumberOfDelayedMessages() {
return
delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
}
@Override
- public CompletableFuture<Void> clearDelayedMessages() {
+ public synchronized CompletableFuture<Void> clearDelayedMessages() {
if (!topic.isDelayedDeliveryEnabled()) {
return CompletableFuture.completedFuture(null);
}
@@ -1451,11 +1451,11 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractPersistentDis
}
- public long getDelayedTrackerMemoryUsage() {
+ public synchronized long getDelayedTrackerMemoryUsage() {
return
delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
}
- public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
+ public synchronized Map<String, TopicMetricBean>
getBucketDelayedIndexStats() {
if (delayedDeliveryTracker.isEmpty()) {
return Collections.emptyMap();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
index 0746b7215b1..e87b789d8b9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
@@ -1137,7 +1137,7 @@ public class PersistentDispatcherMultipleConsumersClassic
extends AbstractPersis
}
@Override
- public boolean trackDelayedDelivery(long ledgerId, long entryId,
MessageMetadata msgMetadata) {
+ public synchronized boolean trackDelayedDelivery(long ledgerId, long
entryId, MessageMetadata msgMetadata) {
if (!topic.isDelayedDeliveryEnabled()) {
// If broker has the feature disabled, always deliver messages
immediately
return false;
@@ -1203,12 +1203,12 @@ public class
PersistentDispatcherMultipleConsumersClassic extends AbstractPersis
}
@Override
- public long getNumberOfDelayedMessages() {
+ public synchronized long getNumberOfDelayedMessages() {
return
delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
}
@Override
- public CompletableFuture<Void> clearDelayedMessages() {
+ public synchronized CompletableFuture<Void> clearDelayedMessages() {
if (!topic.isDelayedDeliveryEnabled()) {
return CompletableFuture.completedFuture(null);
}
@@ -1282,11 +1282,11 @@ public class
PersistentDispatcherMultipleConsumersClassic extends AbstractPersis
}
- public long getDelayedTrackerMemoryUsage() {
+ public synchronized long getDelayedTrackerMemoryUsage() {
return
delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
}
- public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
+ public synchronized Map<String, TopicMetricBean>
getBucketDelayedIndexStats() {
if (delayedDeliveryTracker.isEmpty()) {
return Collections.emptyMap();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index 7d9c8e8bedd..0e3f130cbf1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -249,4 +249,14 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
assertNull(exceptions[0]);
}
+ @Test(dataProvider = "delayedTracker")
+ public void
testAddMultipleMessagesSameWindow(InMemoryDelayedDeliveryTracker tracker)
throws Exception {
+ tracker.addMessage(1, 1, 50);
+ tracker.addMessage(1, 1, 50);
+ tracker.addMessage(1, 1, 50);
+
+ clockTime.set(60);
+
+ tracker.getScheduledMessages(10);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
index 835747bda1e..bc027887082 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
@@ -20,7 +20,14 @@ package org.apache.pulsar.broker.service.persistent;
import com.carrotsearch.hppc.ObjectSet;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -29,12 +36,14 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -172,4 +181,104 @@ public class
PersistentDispatcherMultipleConsumersClassicTest extends ProducerCo
// Verify: the topic can be deleted successfully.
admin.topics().delete(topicName, false);
}
+
+ @Test
+ public void testRaceConditionInTrackDelayedDelivery() throws Exception {
+ final int numThreads = 16;
+ final int operationsPerThread = 2000;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicReference<Exception> firstException = new
AtomicReference<>();
+
+ final String topicName = newTopicName();
+ final String subscription = "s1";
+
+ // Needed to create the topic
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName).subscriptionName(subscription)
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+
+ PersistentTopic topic = (PersistentTopic) getTopic(topicName,
false).join().get();
+
+ ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
+ Mockito.doReturn(subscription).when(cursor).getName();
+
+ Subscription sub = Mockito.mock(PersistentSubscription.class);
+ Mockito.doReturn(topic).when(sub).getTopic();
+
+ PersistentDispatcherMultipleConsumersClassic dispatcher =
+ new PersistentDispatcherMultipleConsumersClassic(topic, cursor,
sub);
+
+ // Align all writes to the same bucket
+ // This is the key which triggers the race condition
+ long deliverAt = System.currentTimeMillis() + 5000;
+
+ MessageMetadata messageMetadata = new MessageMetadata()
+ .setSequenceId(1)
+ .setProducerName("testProducer")
+ .setPartitionKeyB64Encoded(false)
+ .setPublishTime(System.currentTimeMillis())
+ .setDeliverAtTime(deliverAt);
+
+ @Cleanup("shutdown")
+ ExecutorService executorService = Executors.newFixedThreadPool(32);
+
+ // Start clear message thread
+ for (int i = 0; i < numThreads / 2; i++) {
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ for (int j = 0; j < operationsPerThread; j++) {
+ dispatcher.clearDelayedMessages();
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ // Start track delayed delivery thread
+ for (int i = numThreads / 2; i < numThreads; i++) {
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ for (int j = 0; j < operationsPerThread; j++) {
+ dispatcher.trackDelayedDelivery(1, 1, messageMetadata);
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+ Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should
complete within 30 seconds");
+
+ if (errors.get() > 0) {
+ Exception exception = firstException.get();
+ if (exception != null) {
+ System.err.println("First exception caught: " +
exception.getMessage());
+ exception.printStackTrace();
+ }
+ }
+ Assert.assertEquals(errors.get(), 0, "No exceptions should occur
during concurrent operations");
+ }
+
+ /**
+ * Looks up a topic by name, optionally creating it if it doesn't exist.
+ */
+ protected CompletableFuture<Optional<Topic>> getTopic(String topic,
boolean createIfMissing) {
+ return pulsar.getBrokerService().getTopic(topic, createIfMissing);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
index f7293497efd..fdb9fa9109a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
@@ -20,7 +20,14 @@ package org.apache.pulsar.broker.service.persistent;
import com.carrotsearch.hppc.ObjectSet;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -29,12 +36,14 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -172,4 +181,104 @@ public class PersistentDispatcherMultipleConsumersTest
extends ProducerConsumerB
// Verify: the topic can be deleted successfully.
admin.topics().delete(topicName, false);
}
+
+ @Test
+ public void testRaceConditionInTrackDelayedDelivery() throws Exception {
+ final int numThreads = 16;
+ final int operationsPerThread = 2000;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicReference<Exception> firstException = new
AtomicReference<>();
+
+ final String topicName = newTopicName();
+ final String subscription = "s1";
+
+ // Needed to create the topic
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName).subscriptionName(subscription)
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+
+ PersistentTopic topic = (PersistentTopic) getTopic(topicName,
false).join().get();
+
+ ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
+ Mockito.doReturn(subscription).when(cursor).getName();
+
+ Subscription sub = Mockito.mock(PersistentSubscription.class);
+ Mockito.doReturn(topic).when(sub).getTopic();
+
+ PersistentDispatcherMultipleConsumers dispatcher =
+ new PersistentDispatcherMultipleConsumers(topic, cursor, sub);
+
+ // Align all writes to the same bucket
+ // This is the key which triggers the race condition
+ long deliverAt = System.currentTimeMillis() + 5000;
+
+ MessageMetadata messageMetadata = new MessageMetadata()
+ .setSequenceId(1)
+ .setProducerName("testProducer")
+ .setPartitionKeyB64Encoded(false)
+ .setPublishTime(System.currentTimeMillis())
+ .setDeliverAtTime(deliverAt);
+
+ @Cleanup("shutdown")
+ ExecutorService executorService = Executors.newFixedThreadPool(32);
+
+ // Start clear message thread
+ for (int i = 0; i < numThreads / 2; i++) {
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ for (int j = 0; j < operationsPerThread; j++) {
+ dispatcher.clearDelayedMessages();
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ // Start track delayed delivery thread
+ for (int i = numThreads / 2; i < numThreads; i++) {
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ for (int j = 0; j < operationsPerThread; j++) {
+ dispatcher.trackDelayedDelivery(1, 1, messageMetadata);
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+ Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should
complete within 30 seconds");
+
+ if (errors.get() > 0) {
+ Exception exception = firstException.get();
+ if (exception != null) {
+ System.err.println("First exception caught: " +
exception.getMessage());
+ exception.printStackTrace();
+ }
+ }
+ Assert.assertEquals(errors.get(), 0, "No exceptions should occur
during concurrent operations");
+ }
+
+ /**
+ * Looks up a topic by name, optionally creating it if it doesn't exist.
+ */
+ protected CompletableFuture<Optional<Topic>> getTopic(String topic,
boolean createIfMissing) {
+ return pulsar.getBrokerService().getTopic(topic, createIfMissing);
+ }
}