chamons commented on code in PR #25681:
URL: https://github.com/apache/pulsar/pull/25681#discussion_r3196220176


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java:
##########
@@ -152,4 +160,96 @@ public void testSkipReadEntriesFromCloseCursor() throws 
Exception {
         // Verify: the topic can be deleted successfully.
         admin.topics().delete(topicName, false);
     }
+
+    @Test
+    public void testRaceConditionInTrackDelayedDelivery() throws Exception {
+        final int numThreads = 16;
+        final int operationsPerThread = 2000;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+
+        final String topicName = newTopicName();
+        final String subscription = "s1";
+
+        // Needed to create the topic
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName).subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) getTopic(topicName, 
false).join().get();
+
+        ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
+        Mockito.doReturn(subscription).when(cursor).getName();
+
+        Subscription sub = Mockito.mock(PersistentSubscription.class);
+        Mockito.doReturn(topic).when(sub).getTopic();
+
+        PersistentDispatcherMultipleConsumers dispatcher =
+            new PersistentDispatcherMultipleConsumers(topic, cursor, sub);
+
+        // Align all writes to the same bucket
+        // This is the key which triggers the race condition
+        long deliverAt = System.currentTimeMillis() + 5000;
+
+        MessageMetadata messageMetadata = new MessageMetadata()
+            .setSequenceId(1)
+            .setProducerName("testProducer")
+            .setPartitionKeyB64Encoded(false)
+            .setPublishTime(System.currentTimeMillis())
+            .setDeliverAtTime(deliverAt);
+
+        ExecutorService executorService = Executors.newFixedThreadPool(32);

Review Comment:
   Good catch. I borrowed this from 
BucketDelayedDeliveryTrackerThreadSafetyTest but they have a cleanup method I 
missed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to