lhotari commented on code in PR #24929:
URL: https://github.com/apache/pulsar/pull/24929#discussion_r2485856933


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java:
##########
@@ -873,36 +879,57 @@ public void testMsgDropStat() throws Exception {
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
+
+            final int threads = 10;
             @Cleanup("shutdownNow")
-            ExecutorService executor = Executors.newFixedThreadPool(10);
+            ExecutorService executor = Executors.newFixedThreadPool(threads);
             byte[] msgData = "testData".getBytes();
-            final int totalProduceMessages = 1000;
-            CountDownLatch latch = new CountDownLatch(1);
-            AtomicInteger messagesSent = new AtomicInteger(0);
-            for (int i = 0; i < totalProduceMessages; i++) {
-                executor.submit(() -> {
-                    try {
-                        MessageId msgId = producer.send(msgData);
-                        int count = messagesSent.incrementAndGet();
-                        // process at least 20% of messages before signalling 
the latch
-                        // a non-persistent message will return entryId as -1 
when it has been dropped
-                        // due to 
setMaxConcurrentNonPersistentMessagePerConnection limit
-                        // also ensure that it has happened before the latch 
is signalled
-                        if (count > totalProduceMessages * 0.2 && msgId != null
-                                && ((MessageIdImpl) msgId).getEntryId() == -1) 
{
-                            latch.countDown();
+
+            /*
+             * Trigger at least one publisher drop through concurrent send() 
calls.
+             *
+             * Uses CyclicBarrier to ensure all threads send simultaneously, 
creating overlap.
+             * With maxConcurrentNonPersistentMessagePerConnection = 0, 
ServerCnx#handleSend
+             * drops any send while another is in-flight, returning MessageId 
with entryId = -1.
+             * Awaitility repeats whole bursts (bounded to 20s) until a drop 
is observed.
+             */
+            AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
+            Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+                CyclicBarrier barrier = new CyclicBarrier(threads);
+                CountDownLatch completionLatch = new CountDownLatch(threads);
+                AtomicReference<Throwable> error = new AtomicReference<>();
+                publisherDropSeen.set(false);
+
+                for (int i = 0; i < threads; i++) {
+                    executor.submit(() -> {
+                        try {
+                            barrier.await();
+                            MessageId msgId = producer.send(msgData);
+                            // Publisher drop is signaled by 
MessageIdImpl.entryId == -1
+                            if (msgId instanceof MessageIdImpl && 
((MessageIdImpl) msgId).getEntryId() == -1) {
+                                publisherDropSeen.set(true);
+                            }
+                        } catch (Throwable t) {
+                            if (t instanceof InterruptedException) {
+                                Thread.currentThread().interrupt();
+                            }
+                            error.compareAndSet(null, t);
+                        } finally {
+                            completionLatch.countDown();
                         }
+                    });
+                }
 
-                        Thread.sleep(10);
-                    } catch (PulsarClientException e) {
-                        throw new RuntimeException(e);
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw new RuntimeException(e);
-                    }
-                });
-            }
-            assertTrue(latch.await(5, TimeUnit.SECONDS));
+                // Wait for all sends to complete.
+                completionLatch.await();
+
+                if (error.get() != null) {
+                    throw new AssertionError("Concurrent send encountered an 
exception", error.get());
+                }

Review Comment:
   use `assertNull` instead



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java:
##########
@@ -873,36 +879,57 @@ public void testMsgDropStat() throws Exception {
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
+
+            final int threads = 10;
             @Cleanup("shutdownNow")
-            ExecutorService executor = Executors.newFixedThreadPool(10);
+            ExecutorService executor = Executors.newFixedThreadPool(threads);
             byte[] msgData = "testData".getBytes();
-            final int totalProduceMessages = 1000;
-            CountDownLatch latch = new CountDownLatch(1);
-            AtomicInteger messagesSent = new AtomicInteger(0);
-            for (int i = 0; i < totalProduceMessages; i++) {
-                executor.submit(() -> {
-                    try {
-                        MessageId msgId = producer.send(msgData);
-                        int count = messagesSent.incrementAndGet();
-                        // process at least 20% of messages before signalling 
the latch
-                        // a non-persistent message will return entryId as -1 
when it has been dropped
-                        // due to 
setMaxConcurrentNonPersistentMessagePerConnection limit
-                        // also ensure that it has happened before the latch 
is signalled
-                        if (count > totalProduceMessages * 0.2 && msgId != null
-                                && ((MessageIdImpl) msgId).getEntryId() == -1) 
{
-                            latch.countDown();
+
+            /*
+             * Trigger at least one publisher drop through concurrent send() 
calls.
+             *
+             * Uses CyclicBarrier to ensure all threads send simultaneously, 
creating overlap.
+             * With maxConcurrentNonPersistentMessagePerConnection = 0, 
ServerCnx#handleSend
+             * drops any send while another is in-flight, returning MessageId 
with entryId = -1.
+             * Awaitility repeats whole bursts (bounded to 20s) until a drop 
is observed.
+             */
+            AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
+            Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {

Review Comment:
   I don't think using Awaitility is needed here. Awaitility is only used when 
there's some sort of polling and retry loop.



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java:
##########
@@ -873,36 +879,57 @@ public void testMsgDropStat() throws Exception {
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
+
+            final int threads = 10;
             @Cleanup("shutdownNow")
-            ExecutorService executor = Executors.newFixedThreadPool(10);
+            ExecutorService executor = Executors.newFixedThreadPool(threads);
             byte[] msgData = "testData".getBytes();
-            final int totalProduceMessages = 1000;
-            CountDownLatch latch = new CountDownLatch(1);
-            AtomicInteger messagesSent = new AtomicInteger(0);
-            for (int i = 0; i < totalProduceMessages; i++) {
-                executor.submit(() -> {
-                    try {
-                        MessageId msgId = producer.send(msgData);
-                        int count = messagesSent.incrementAndGet();
-                        // process at least 20% of messages before signalling 
the latch
-                        // a non-persistent message will return entryId as -1 
when it has been dropped
-                        // due to 
setMaxConcurrentNonPersistentMessagePerConnection limit
-                        // also ensure that it has happened before the latch 
is signalled
-                        if (count > totalProduceMessages * 0.2 && msgId != null
-                                && ((MessageIdImpl) msgId).getEntryId() == -1) 
{
-                            latch.countDown();
+
+            /*
+             * Trigger at least one publisher drop through concurrent send() 
calls.
+             *
+             * Uses CyclicBarrier to ensure all threads send simultaneously, 
creating overlap.
+             * With maxConcurrentNonPersistentMessagePerConnection = 0, 
ServerCnx#handleSend
+             * drops any send while another is in-flight, returning MessageId 
with entryId = -1.
+             * Awaitility repeats whole bursts (bounded to 20s) until a drop 
is observed.
+             */
+            AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
+            Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+                CyclicBarrier barrier = new CyclicBarrier(threads);
+                CountDownLatch completionLatch = new CountDownLatch(threads);
+                AtomicReference<Throwable> error = new AtomicReference<>();
+                publisherDropSeen.set(false);
+
+                for (int i = 0; i < threads; i++) {
+                    executor.submit(() -> {
+                        try {
+                            barrier.await();
+                            MessageId msgId = producer.send(msgData);
+                            // Publisher drop is signaled by 
MessageIdImpl.entryId == -1
+                            if (msgId instanceof MessageIdImpl && 
((MessageIdImpl) msgId).getEntryId() == -1) {
+                                publisherDropSeen.set(true);
+                            }
+                        } catch (Throwable t) {
+                            if (t instanceof InterruptedException) {
+                                Thread.currentThread().interrupt();
+                            }
+                            error.compareAndSet(null, t);
+                        } finally {
+                            completionLatch.countDown();
                         }
+                    });
+                }
 
-                        Thread.sleep(10);
-                    } catch (PulsarClientException e) {
-                        throw new RuntimeException(e);
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw new RuntimeException(e);
-                    }
-                });
-            }
-            assertTrue(latch.await(5, TimeUnit.SECONDS));
+                // Wait for all sends to complete.
+                completionLatch.await();

Review Comment:
   ```suggestion
                   assertTrue(completionLatch.await(20, TimeUnit.Seconds));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to