merlimat commented on code in PR #25681:
URL: https://github.com/apache/pulsar/pull/25681#discussion_r3191896552


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -1379,7 +1379,9 @@ protected synchronized boolean 
shouldPauseDeliveryForDelayTracker() {
 
     @Override
     public long getNumberOfDelayedMessages() {
-        return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
+        synchronized (this) {

Review Comment:
   nit: this could be `synchronized` on the method itself



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java:
##########
@@ -152,4 +160,96 @@ public void testSkipReadEntriesFromCloseCursor() throws 
Exception {
         // Verify: the topic can be deleted successfully.
         admin.topics().delete(topicName, false);
     }
+
+    @Test
+    public void testRaceConditionInTrackDelayedDelivery() throws Exception {
+        final int numThreads = 16;
+        final int operationsPerThread = 2000;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+
+        final String topicName = newTopicName();
+        final String subscription = "s1";
+
+        // Needed to create the topic
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName).subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) getTopic(topicName, 
false).join().get();
+
+        ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
+        Mockito.doReturn(subscription).when(cursor).getName();
+
+        Subscription sub = Mockito.mock(PersistentSubscription.class);
+        Mockito.doReturn(topic).when(sub).getTopic();
+
+        PersistentDispatcherMultipleConsumers dispatcher =
+            new PersistentDispatcherMultipleConsumers(topic, cursor, sub);
+
+        // Align all writes to the same bucket
+        // This is the key which triggers the race condition
+        long deliverAt = System.currentTimeMillis() + 5000;
+
+        MessageMetadata messageMetadata = new MessageMetadata()
+            .setSequenceId(1)
+            .setProducerName("testProducer")
+            .setPartitionKeyB64Encoded(false)
+            .setPublishTime(System.currentTimeMillis())
+            .setDeliverAtTime(deliverAt);
+
+        ExecutorService executorService = Executors.newFixedThreadPool(32);

Review Comment:
   These are many threads for a unit test :) 
   
   The executor needs to get shutdown. Use `@Cleanup("shutdown")`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to