This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 810fa1f0c44270dd7d5b3427ff2aad59c9924621
Author: Vinkal <[email protected]>
AuthorDate: Tue Nov 4 15:46:28 2025 +0530

    [fix][test] Stabilize testMsgDropStat by reliably triggering non-persistent 
publisher drop (#24929)
    
    Signed-off-by: Vinkal Chudgar <[email protected]>
    (cherry picked from commit 60acfba3aec83f7cb4b6aebb274d203893b4b65b)
---
 .../pulsar/client/api/NonPersistentTopicTest.java  | 93 ++++++++++++++--------
 1 file changed, 59 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 5ebc28335f9..aa2a6efe117 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -28,17 +28,19 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.net.URL;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
@@ -813,18 +815,22 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
     }
 
     /**
-     * Verifies msg-drop stats
+     * Verifies msg-drop stats.
      *
      * @throws Exception
      */
-    @Test
+    @Test(timeOut = 60000)
     public void testMsgDropStat() throws Exception {
 
         int defaultNonPersistentMessageRate = 
conf.getMaxConcurrentNonPersistentMessagePerConnection();
         try {
             final String topicName = 
BrokerTestUtil.newUniqueName("non-persistent://my-property/my-ns/stats-topic");
-            // restart broker with lower publish rate limit
-            conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
+
+            // For non-persistent topics, set the per-connection in-flight 
limit to 0.
+            // Since ServerCnx drops when inFlight > max; with max=0, any 
second overlapping send on the
+            // same connection is dropped (entryId == -1) and recorded. This 
makes observing a publisher drop
+            // reliable in this test.
+            conf.setMaxConcurrentNonPersistentMessagePerConnection(0);
             stopBroker();
             startBroker();
 
@@ -840,39 +846,58 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
 
             @Cleanup
             ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
-                .enableBatching(false)
-                .messageRoutingMode(MessageRoutingMode.SinglePartition)
-                .create();
+                    .enableBatching(false)
+                    .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                    .create();
+
+            final int threads = 10;
             @Cleanup("shutdownNow")
-            ExecutorService executor = Executors.newFixedThreadPool(10);
+            ExecutorService executor = Executors.newFixedThreadPool(threads);
             byte[] msgData = "testData".getBytes();
-            final int totalProduceMessages = 1000;
-            CountDownLatch latch = new CountDownLatch(1);
-            AtomicInteger messagesSent = new AtomicInteger(0);
-            for (int i = 0; i < totalProduceMessages; i++) {
-                executor.submit(() -> {
-                    try {
-                        MessageId msgId = producer.send(msgData);
-                        int count = messagesSent.incrementAndGet();
-                        // process at least 20% of messages before signalling 
the latch
-                        // a non-persistent message will return entryId as -1 
when it has been dropped
-                        // due to 
setMaxConcurrentNonPersistentMessagePerConnection limit
-                        // also ensure that it has happened before the latch 
is signalled
-                        if (count > totalProduceMessages * 0.2 && msgId != null
-                                && ((MessageIdImpl) msgId).getEntryId() == -1) 
{
-                            latch.countDown();
+
+            /*
+             * Trigger at least one publisher drop through concurrent send() 
calls.
+             *
+             * Uses CyclicBarrier to ensure all threads send simultaneously, 
creating overlap.
+             * With maxConcurrentNonPersistentMessagePerConnection = 0, 
ServerCnx#handleSend
+             * drops any send while another is in-flight, returning MessageId 
with entryId = -1.
+             * Awaitility repeats whole bursts (bounded to 20s) until a drop 
is observed.
+             */
+            AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
+            Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+                CyclicBarrier barrier = new CyclicBarrier(threads);
+                CountDownLatch completionLatch = new CountDownLatch(threads);
+                AtomicReference<Throwable> error = new AtomicReference<>();
+                publisherDropSeen.set(false);
+
+                for (int i = 0; i < threads; i++) {
+                    executor.submit(() -> {
+                        try {
+                            barrier.await();
+                            MessageId msgId = producer.send(msgData);
+                            // Publisher drop is signaled by 
MessageIdImpl.entryId == -1
+                            if (msgId instanceof MessageIdImpl && 
((MessageIdImpl) msgId).getEntryId() == -1) {
+                                publisherDropSeen.set(true);
+                            }
+                        } catch (Throwable t) {
+                            if (t instanceof InterruptedException) {
+                                Thread.currentThread().interrupt();
+                            }
+                            error.compareAndSet(null, t);
+                        } finally {
+                            completionLatch.countDown();
                         }
+                    });
+                }
 
-                        Thread.sleep(10);
-                    } catch (PulsarClientException e) {
-                        throw new RuntimeException(e);
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw new RuntimeException(e);
-                    }
-                });
-            }
-            assertTrue(latch.await(5, TimeUnit.SECONDS));
+                // Wait for all sends to complete.
+                assertTrue(completionLatch.await(20, TimeUnit.SECONDS));
+
+                assertNull(error.get(), "Concurrent send encountered an 
exception");
+                return publisherDropSeen.get();
+            });
+
+            assertTrue(publisherDropSeen.get(), "Expected at least one 
publisher drop (entryId == -1)");
 
             NonPersistentTopic topic =
                     (NonPersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();

Reply via email to