lhotari commented on code in PR #24929:
URL: https://github.com/apache/pulsar/pull/24929#discussion_r2487525948


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java:
##########
@@ -873,36 +879,57 @@ public void testMsgDropStat() throws Exception {
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
+
+            final int threads = 10;
             @Cleanup("shutdownNow")
-            ExecutorService executor = Executors.newFixedThreadPool(10);
+            ExecutorService executor = Executors.newFixedThreadPool(threads);
             byte[] msgData = "testData".getBytes();
-            final int totalProduceMessages = 1000;
-            CountDownLatch latch = new CountDownLatch(1);
-            AtomicInteger messagesSent = new AtomicInteger(0);
-            for (int i = 0; i < totalProduceMessages; i++) {
-                executor.submit(() -> {
-                    try {
-                        MessageId msgId = producer.send(msgData);
-                        int count = messagesSent.incrementAndGet();
-                        // process at least 20% of messages before signalling 
the latch
-                        // a non-persistent message will return entryId as -1 
when it has been dropped
-                        // due to 
setMaxConcurrentNonPersistentMessagePerConnection limit
-                        // also ensure that it has happened before the latch 
is signalled
-                        if (count > totalProduceMessages * 0.2 && msgId != null
-                                && ((MessageIdImpl) msgId).getEntryId() == -1) 
{
-                            latch.countDown();
+
+            /*
+             * Trigger at least one publisher drop through concurrent send() 
calls.
+             *
+             * Uses CyclicBarrier to ensure all threads send simultaneously, 
creating overlap.
+             * With maxConcurrentNonPersistentMessagePerConnection = 0, 
ServerCnx#handleSend
+             * drops any send while another is in-flight, returning MessageId 
with entryId = -1.
+             * Awaitility repeats whole bursts (bounded to 20s) until a drop 
is observed.
+             */
+            AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
+            Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {

Review Comment:
   I guess this is fine if the intention is to retry for 20 second until 
something is dropped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to