This is an automated email from the ASF dual-hosted git repository. mmarshall pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 25c4b7cee40 [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, follow up (#20401) 25c4b7cee40 is described below commit 25c4b7cee402a1d486d720a71dc2e06aa6d9af64 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Thu May 25 19:28:18 2023 +0300 [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, follow up (#20401) Fixes #20386 ### Motivation - the previous attempt #20387 to fix the flakiness wasn't effective ### Modifications - improve the fix and rely on the fact that entryId is -1 when the message is dropped in the broker code: https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1699-L1711 ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> --- .../pulsar/client/api/NonPersistentTopicTest.java | 30 +++++++++++++++------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index c41ab3e8ccc..63ce0f00dff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -50,6 +51,7 @@ import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PartitionedProducerImpl; import org.apache.pulsar.client.impl.ProducerImpl; @@ -824,13 +826,14 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { conf.setMaxConcurrentNonPersistentMessagePerConnection(1); stopBroker(); startBroker(); + + pulsar.getBrokerService().updateRates(); + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1") - .receiverQueueSize(1) - .messageListener((c, msg) -> {}).subscribe(); + .receiverQueueSize(1).subscribe(); Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2") - .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared) - .messageListener((c, msg) -> {}).subscribe(); + .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe(); ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) .enableBatching(false) @@ -839,17 +842,26 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(5); byte[] msgData = "testData".getBytes(); - final int totalProduceMessages = 200; - CountDownLatch latch = new CountDownLatch(totalProduceMessages); + final int totalProduceMessages = 1000; + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger messagesSent = new AtomicInteger(0); for (int i = 0; i < totalProduceMessages; i++) { executor.submit(() -> { - producer.sendAsync(msgData).handle((msg, e) -> { - latch.countDown(); + producer.sendAsync(msgData).handle((msgId, e) -> { + int count = messagesSent.incrementAndGet(); + // process at least 20% of messages before signalling the latch + // a non-persistent message will return entryId as -1 when it has been dropped + // due to setMaxConcurrentNonPersistentMessagePerConnection limit + // also ensure that it has happened before the latch is signalled + if (count > totalProduceMessages * 0.2 && msgId != null + && ((MessageIdImpl) msgId).getEntryId() == -1) { + latch.countDown(); + } return null; }); }); } - latch.await(); + latch.await(5, TimeUnit.SECONDS); NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();