This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d66e5eeab48 [fix][test] Stabilize testMsgDropStat by reliably 
triggering non-persistent publisher drop (#24929)
d66e5eeab48 is described below

commit d66e5eeab48c9b8a534ec70502e3f23fdb9358c0
Author: Vinkal <[email protected]>
AuthorDate: Tue Nov 4 15:46:28 2025 +0530

    [fix][test] Stabilize testMsgDropStat by reliably triggering non-persistent 
publisher drop (#24929)
    
    Signed-off-by: Vinkal Chudgar <[email protected]>
    (cherry picked from commit 60acfba3aec83f7cb4b6aebb274d203893b4b65b)
---
 .../pulsar/client/api/NonPersistentTopicTest.java  | 97 ++++++++++++++++------
 1 file changed, 73 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 85274064964..fd18949a2e6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -28,17 +28,21 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.net.URL;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -50,6 +54,7 @@ import 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
@@ -66,6 +71,7 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -813,14 +819,18 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
      *
      * @throws Exception
      */
-    @Test
+    @Test(timeOut = 60000)
     public void testMsgDropStat() throws Exception {
 
         int defaultNonPersistentMessageRate = 
conf.getMaxConcurrentNonPersistentMessagePerConnection();
         try {
-            final String topicName = 
"non-persistent://my-property/my-ns/stats-topic";
-            // restart broker with lower publish rate limit
-            conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
+            final String topicName = 
BrokerTestUtil.newUniqueName("non-persistent://my-property/my-ns/stats-topic");
+
+            // For non-persistent topics, set the per-connection in-flight 
limit to 0.
+            // Since ServerCnx drops when inFlight > max; with max=0, any 
second overlapping send on the
+            // same connection is dropped (entryId == -1) and recorded. This 
makes observing a publisher drop
+            // reliable in this test.
+            conf.setMaxConcurrentNonPersistentMessagePerConnection(0);
             stopBroker();
             startBroker();
             Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1")
@@ -833,30 +843,69 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
+
+            final int threads = 10;
             @Cleanup("shutdownNow")
-            ExecutorService executor = Executors.newFixedThreadPool(5);
+            ExecutorService executor = Executors.newFixedThreadPool(threads);
             byte[] msgData = "testData".getBytes();
-            final int totalProduceMessages = 200;
-            CountDownLatch latch = new CountDownLatch(totalProduceMessages);
-            for (int i = 0; i < totalProduceMessages; i++) {
-                executor.submit(() -> {
-                    producer.sendAsync(msgData).handle((msg, e) -> {
-                        latch.countDown();
-                        return null;
+
+            /*
+             * Trigger at least one publisher drop through concurrent send() 
calls.
+             *
+             * Uses CyclicBarrier to ensure all threads send simultaneously, 
creating overlap.
+             * With maxConcurrentNonPersistentMessagePerConnection = 0, 
ServerCnx#handleSend
+             * drops any send while another is in-flight, returning MessageId 
with entryId = -1.
+             * Awaitility repeats whole bursts (bounded to 20s) until a drop 
is observed.
+             */
+            AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
+            Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+                CyclicBarrier barrier = new CyclicBarrier(threads);
+                CountDownLatch completionLatch = new CountDownLatch(threads);
+                AtomicReference<Throwable> error = new AtomicReference<>();
+                publisherDropSeen.set(false);
+
+                for (int i = 0; i < threads; i++) {
+                    executor.submit(() -> {
+                        try {
+                            barrier.await();
+                            MessageId msgId = producer.send(msgData);
+                            // Publisher drop is signaled by 
MessageIdImpl.entryId == -1
+                            if (msgId instanceof MessageIdImpl && 
((MessageIdImpl) msgId).getEntryId() == -1) {
+                                publisherDropSeen.set(true);
+                            }
+                        } catch (Throwable t) {
+                            if (t instanceof InterruptedException) {
+                                Thread.currentThread().interrupt();
+                            }
+                            error.compareAndSet(null, t);
+                        } finally {
+                            completionLatch.countDown();
+                        }
                     });
-                });
-            }
-            latch.await();
+                }
+
+                // Wait for all sends to complete.
+                assertTrue(completionLatch.await(20, TimeUnit.SECONDS));
+
+                assertNull(error.get(), "Concurrent send encountered an 
exception");
+                return publisherDropSeen.get();
+            });
+
+            assertTrue(publisherDropSeen.get(), "Expected at least one 
publisher drop (entryId == -1)");
+
+            NonPersistentTopic topic =
+                    (NonPersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
 
-            NonPersistentTopic topic = (NonPersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
-            pulsar.getBrokerService().updateRates();
-            NonPersistentTopicStats stats = topic.getStats(false, false, 
false);
-            NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
-            NonPersistentSubscriptionStats sub1Stats = 
stats.getSubscriptions().get("subscriber-1");
-            NonPersistentSubscriptionStats sub2Stats = 
stats.getSubscriptions().get("subscriber-2");
-            assertTrue(npStats.getMsgDropRate() > 0);
-            assertTrue(sub1Stats.getMsgDropRate() > 0);
-            assertTrue(sub2Stats.getMsgDropRate() > 0);
+            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+                pulsar.getBrokerService().updateRates();
+                NonPersistentTopicStats stats = topic.getStats(false, false, 
false);
+                NonPersistentPublisherStats npStats = 
stats.getPublishers().get(0);
+                NonPersistentSubscriptionStats sub1Stats = 
stats.getSubscriptions().get("subscriber-1");
+                NonPersistentSubscriptionStats sub2Stats = 
stats.getSubscriptions().get("subscriber-2");
+                assertTrue(npStats.getMsgDropRate() > 0);
+                assertTrue(sub1Stats.getMsgDropRate() > 0);
+                assertTrue(sub2Stats.getMsgDropRate() > 0);
+            });
 
             producer.close();
             consumer.close();

Reply via email to