vinkal-chudgar commented on code in PR #24929:
URL: https://github.com/apache/pulsar/pull/24929#discussion_r2487668230


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java:
##########
@@ -873,36 +879,57 @@ public void testMsgDropStat() throws Exception {
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
+
+            final int threads = 10;
             @Cleanup("shutdownNow")
-            ExecutorService executor = Executors.newFixedThreadPool(10);
+            ExecutorService executor = Executors.newFixedThreadPool(threads);
             byte[] msgData = "testData".getBytes();
-            final int totalProduceMessages = 1000;
-            CountDownLatch latch = new CountDownLatch(1);
-            AtomicInteger messagesSent = new AtomicInteger(0);
-            for (int i = 0; i < totalProduceMessages; i++) {
-                executor.submit(() -> {
-                    try {
-                        MessageId msgId = producer.send(msgData);
-                        int count = messagesSent.incrementAndGet();
-                        // process at least 20% of messages before signalling 
the latch
-                        // a non-persistent message will return entryId as -1 
when it has been dropped
-                        // due to 
setMaxConcurrentNonPersistentMessagePerConnection limit
-                        // also ensure that it has happened before the latch 
is signalled
-                        if (count > totalProduceMessages * 0.2 && msgId != null
-                                && ((MessageIdImpl) msgId).getEntryId() == -1) 
{
-                            latch.countDown();
+
+            /*
+             * Trigger at least one publisher drop through concurrent send() 
calls.
+             *
+             * Uses CyclicBarrier to ensure all threads send simultaneously, 
creating overlap.
+             * With maxConcurrentNonPersistentMessagePerConnection = 0, 
ServerCnx#handleSend
+             * drops any send while another is in-flight, returning MessageId 
with entryId = -1.
+             * Awaitility repeats whole bursts (bounded to 20s) until a drop 
is observed.
+             */
+            AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
+            Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {

Review Comment:
   `Awaitility` is used here specifically for retry, not for polling broker 
state. In this test, it provides a bounded retry of a full, coordinated batch 
of concurrent `send()` calls until we observe a publisher-drop receipt 
(`MessageIdImpl.entryId == -1`) or we hit the 20 second cap
   
   Even with a `CyclicBarrier` and 
`maxConcurrentNonPersistentMessagePerConnection = 0`, there is a small chance 
the server processes the sends sequentially and the guard 
`nonPersistentPendingMessages > maxNonPersistentMessagePerConnection` is not 
satisfied in that coordinated batch of concurrent sends. This can happen 
because non-persistent publishes complete very quickly and the completion path 
promptly decrements the in-flight counter (`nonPersistentPendingMessages`). 
This is a low-probability scheduling corner case; the bounded retry using 
`Awaitility` eliminates it and keeps the test stable.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to