This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 25c4b7cee40 [fix][test] Fix flaky test 
NonPersistentTopicTest.testMsgDropStat, follow up (#20401)
25c4b7cee40 is described below

commit 25c4b7cee402a1d486d720a71dc2e06aa6d9af64
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Thu May 25 19:28:18 2023 +0300

    [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, follow 
up (#20401)
    
    Fixes #20386
    
    ### Motivation
    
    - the previous attempt #20387 to fix the flakiness wasn't effective
    
    ### Modifications
    
    - improve the fix and rely on the fact that entryId is -1 when the message 
is dropped in the broker
      code: 
https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1699-L1711
    
    ### Documentation
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    - [ ] `doc` <!-- Your PR contains doc changes. -->
    - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
    - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
    - [ ] `doc-complete` <!-- Docs have been already added -->
---
 .../pulsar/client/api/NonPersistentTopicTest.java  | 30 +++++++++++++++-------
 1 file changed, 21 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index c41ab3e8ccc..63ce0f00dff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -50,6 +51,7 @@ import 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
@@ -824,13 +826,14 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
             conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
             stopBroker();
             startBroker();
+
+            pulsar.getBrokerService().updateRates();
+
             Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1")
-                    .receiverQueueSize(1)
-                    .messageListener((c, msg) -> {}).subscribe();
+                    .receiverQueueSize(1).subscribe();
 
             Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2")
-                    
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared)
-                    .messageListener((c, msg) -> {}).subscribe();
+                    
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();
 
             ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
                 .enableBatching(false)
@@ -839,17 +842,26 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
             @Cleanup("shutdownNow")
             ExecutorService executor = Executors.newFixedThreadPool(5);
             byte[] msgData = "testData".getBytes();
-            final int totalProduceMessages = 200;
-            CountDownLatch latch = new CountDownLatch(totalProduceMessages);
+            final int totalProduceMessages = 1000;
+            CountDownLatch latch = new CountDownLatch(1);
+            AtomicInteger messagesSent = new AtomicInteger(0);
             for (int i = 0; i < totalProduceMessages; i++) {
                 executor.submit(() -> {
-                    producer.sendAsync(msgData).handle((msg, e) -> {
-                        latch.countDown();
+                    producer.sendAsync(msgData).handle((msgId, e) -> {
+                        int count = messagesSent.incrementAndGet();
+                        // process at least 20% of messages before signalling 
the latch
+                        // a non-persistent message will return entryId as -1 
when it has been dropped
+                        // due to 
setMaxConcurrentNonPersistentMessagePerConnection limit
+                        // also ensure that it has happened before the latch 
is signalled
+                        if (count > totalProduceMessages * 0.2 && msgId != null
+                                && ((MessageIdImpl) msgId).getEntryId() == -1) 
{
+                            latch.countDown();
+                        }
                         return null;
                     });
                 });
             }
-            latch.await();
+            latch.await(5, TimeUnit.SECONDS);
 
             NonPersistentTopic topic =
                     (NonPersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();

Reply via email to