This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e953228fe3331a11a7b0c6c500d8c9c1044bde9f Author: Chris Hamons <[email protected]> AuthorDate: Fri May 8 15:41:23 2026 -0500 [fix][broker] Correct two race conditions in the tracker code and logic bug in InMemoryDelayedDeliveryTracker that failed with NoSuchElementException (#25681) (cherry picked from commit 6cbf4d2ad0d1514bfd2885015084102c2896ea67) --- .../delayed/InMemoryDelayedDeliveryTracker.java | 14 ++- .../PersistentDispatcherMultipleConsumers.java | 10 +-- ...rsistentDispatcherMultipleConsumersClassic.java | 10 +-- .../delayed/InMemoryDeliveryTrackerTest.java | 10 +++ ...tentDispatcherMultipleConsumersClassicTest.java | 99 ++++++++++++++++++++++ .../PersistentDispatcherMultipleConsumersTest.java | 99 ++++++++++++++++++++++ 6 files changed, 228 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index ad5ab25fbbf..1ded2cc428d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -126,10 +126,16 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack } long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); - delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) - .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()) - .add(entryId); - delayedMessagesCount.incrementAndGet(); + Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) + .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); + // Roaring64Bitmap does not store duplicates, so track if it a new element + // so we can keep delayedMessagesCount in sync + boolean isNew = !bitmap.contains(entryId); + + if (isNew) { + bitmap.add(entryId); + delayedMessagesCount.incrementAndGet(); + } updateTimer(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 220b9e23a3e..cfabc9333ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -442,7 +442,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis } } - protected Predicate<Position> createReadEntriesSkipConditionForNormalRead() { + protected synchronized Predicate<Position> createReadEntriesSkipConditionForNormalRead() { Predicate<Position> skipCondition = null; // Filter out and skip read delayed messages exist in DelayedDeliveryTracker if (delayedDeliveryTracker.isPresent()) { @@ -1368,12 +1368,12 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis } @Override - public long getNumberOfDelayedMessages() { + public synchronized long getNumberOfDelayedMessages() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); } @Override - public CompletableFuture<Void> clearDelayedMessages() { + public synchronized CompletableFuture<Void> clearDelayedMessages() { if (!topic.isDelayedDeliveryEnabled()) { return CompletableFuture.completedFuture(null); } @@ -1452,11 +1452,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis } - public long getDelayedTrackerMemoryUsage() { + public synchronized long getDelayedTrackerMemoryUsage() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L); } - public Map<String, TopicMetricBean> getBucketDelayedIndexStats() { + public synchronized Map<String, TopicMetricBean> getBucketDelayedIndexStats() { if (delayedDeliveryTracker.isEmpty()) { return Collections.emptyMap(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index d828f338132..28dd52f89d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -1138,7 +1138,7 @@ public class PersistentDispatcherMultipleConsumersClassic extends AbstractPersis } @Override - public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { + public synchronized boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { if (!topic.isDelayedDeliveryEnabled()) { // If broker has the feature disabled, always deliver messages immediately return false; @@ -1204,12 +1204,12 @@ public class PersistentDispatcherMultipleConsumersClassic extends AbstractPersis } @Override - public long getNumberOfDelayedMessages() { + public synchronized long getNumberOfDelayedMessages() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); } @Override - public CompletableFuture<Void> clearDelayedMessages() { + public synchronized CompletableFuture<Void> clearDelayedMessages() { if (!topic.isDelayedDeliveryEnabled()) { return CompletableFuture.completedFuture(null); } @@ -1283,11 +1283,11 @@ public class PersistentDispatcherMultipleConsumersClassic extends AbstractPersis } - public long getDelayedTrackerMemoryUsage() { + public synchronized long getDelayedTrackerMemoryUsage() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L); } - public Map<String, TopicMetricBean> getBucketDelayedIndexStats() { + public synchronized Map<String, TopicMetricBean> getBucketDelayedIndexStats() { if (delayedDeliveryTracker.isEmpty()) { return Collections.emptyMap(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index c5a564d1b66..2affb7fef22 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -274,4 +274,14 @@ public class InMemoryDeliveryTrackerTest extends AbstractDeliveryTrackerTest { tracker.close(); } + @Test(dataProvider = "delayedTracker") + public void testAddMultipleMessagesSameWindow(InMemoryDelayedDeliveryTracker tracker) throws Exception { + tracker.addMessage(1, 1, 50); + tracker.addMessage(1, 1, 50); + tracker.addMessage(1, 1, 50); + + clockTime.set(60); + + tracker.getScheduledMessages(10); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java index 52b36493394..879cbecd61e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java @@ -20,7 +20,12 @@ package org.apache.pulsar.broker.service.persistent; import com.carrotsearch.hppc.ObjectSet; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -34,6 +39,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; import org.testng.Assert; @@ -152,4 +158,97 @@ public class PersistentDispatcherMultipleConsumersClassicTest extends SharedPuls // Verify: the topic can be deleted successfully. admin.topics().delete(topicName, false); } + + @Test + public void testRaceConditionInTrackDelayedDelivery() throws Exception { + final int numThreads = 16; + final int operationsPerThread = 2000; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final AtomicInteger errors = new AtomicInteger(0); + final AtomicReference<Exception> firstException = new AtomicReference<>(); + + final String topicName = newTopicName(); + final String subscription = "s1"; + + // Needed to create the topic + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName).subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared).subscribe(); + + PersistentTopic topic = (PersistentTopic) getTopic(topicName, false).join().get(); + + ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class); + Mockito.doReturn(subscription).when(cursor).getName(); + + Subscription sub = Mockito.mock(PersistentSubscription.class); + Mockito.doReturn(topic).when(sub).getTopic(); + + PersistentDispatcherMultipleConsumersClassic dispatcher = + new PersistentDispatcherMultipleConsumersClassic(topic, cursor, sub); + + // Align all writes to the same bucket + // This is the key which triggers the race condition + long deliverAt = System.currentTimeMillis() + 5000; + + MessageMetadata messageMetadata = new MessageMetadata() + .setSequenceId(1) + .setProducerName("testProducer") + .setPartitionKeyB64Encoded(false) + .setPublishTime(System.currentTimeMillis()) + .setDeliverAtTime(deliverAt); + + @Cleanup("shutdown") + ExecutorService executorService = Executors.newFixedThreadPool(32); + + // Start clear message thread + for (int i = 0; i < numThreads / 2; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + dispatcher.clearDelayedMessages(); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start track delayed delivery thread + for (int i = numThreads / 2; i < numThreads; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + dispatcher.trackDelayedDelivery(1, 1, messageMetadata); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds"); + + if (errors.get() > 0) { + Exception exception = firstException.get(); + if (exception != null) { + System.err.println("First exception caught: " + exception.getMessage()); + exception.printStackTrace(); + } + } + Assert.assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations"); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java index 4cba12fc3df..79fd3b03d1f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java @@ -20,7 +20,12 @@ package org.apache.pulsar.broker.service.persistent; import com.carrotsearch.hppc.ObjectSet; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -34,6 +39,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; import org.testng.Assert; @@ -152,4 +158,97 @@ public class PersistentDispatcherMultipleConsumersTest extends SharedPulsarBaseT // Verify: the topic can be deleted successfully. admin.topics().delete(topicName, false); } + + @Test + public void testRaceConditionInTrackDelayedDelivery() throws Exception { + final int numThreads = 16; + final int operationsPerThread = 2000; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final AtomicInteger errors = new AtomicInteger(0); + final AtomicReference<Exception> firstException = new AtomicReference<>(); + + final String topicName = newTopicName(); + final String subscription = "s1"; + + // Needed to create the topic + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName).subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared).subscribe(); + + PersistentTopic topic = (PersistentTopic) getTopic(topicName, false).join().get(); + + ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class); + Mockito.doReturn(subscription).when(cursor).getName(); + + Subscription sub = Mockito.mock(PersistentSubscription.class); + Mockito.doReturn(topic).when(sub).getTopic(); + + PersistentDispatcherMultipleConsumers dispatcher = + new PersistentDispatcherMultipleConsumers(topic, cursor, sub); + + // Align all writes to the same bucket + // This is the key which triggers the race condition + long deliverAt = System.currentTimeMillis() + 5000; + + MessageMetadata messageMetadata = new MessageMetadata() + .setSequenceId(1) + .setProducerName("testProducer") + .setPartitionKeyB64Encoded(false) + .setPublishTime(System.currentTimeMillis()) + .setDeliverAtTime(deliverAt); + + @Cleanup("shutdown") + ExecutorService executorService = Executors.newFixedThreadPool(32); + + // Start clear message thread + for (int i = 0; i < numThreads / 2; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + dispatcher.clearDelayedMessages(); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start track delayed delivery thread + for (int i = numThreads / 2; i < numThreads; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + dispatcher.trackDelayedDelivery(1, 1, messageMetadata); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds"); + + if (errors.get() > 0) { + Exception exception = firstException.get(); + if (exception != null) { + System.err.println("First exception caught: " + exception.getMessage()); + exception.printStackTrace(); + } + } + Assert.assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations"); + } }
