This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 903425be3af [fix][test] Fix flaky test
NonPersistentTopicTest.testMsgDropStat (#20387)
903425be3af is described below
commit 903425be3afbbcfeb0a7213b0b7d04afe5868151
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 24 19:35:15 2023 +0300
[fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat (#20387)
---
.../pulsar/client/api/NonPersistentTopicTest.java | 29 ++++++++++++++--------
1 file changed, 18 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 85274064964..c41ab3e8ccc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -824,10 +825,12 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
stopBroker();
startBroker();
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1")
- .receiverQueueSize(1).subscribe();
+ .receiverQueueSize(1)
+ .messageListener((c, msg) -> {}).subscribe();
Consumer<byte[]> consumer2 =
pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2")
-
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();
+
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared)
+ .messageListener((c, msg) -> {}).subscribe();
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
@@ -848,15 +851,19 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
}
latch.await();
- NonPersistentTopic topic = (NonPersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
- pulsar.getBrokerService().updateRates();
- NonPersistentTopicStats stats = topic.getStats(false, false,
false);
- NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
- NonPersistentSubscriptionStats sub1Stats =
stats.getSubscriptions().get("subscriber-1");
- NonPersistentSubscriptionStats sub2Stats =
stats.getSubscriptions().get("subscriber-2");
- assertTrue(npStats.getMsgDropRate() > 0);
- assertTrue(sub1Stats.getMsgDropRate() > 0);
- assertTrue(sub2Stats.getMsgDropRate() > 0);
+ NonPersistentTopic topic =
+ (NonPersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+ Awaitility.await().untilAsserted(() -> {
+ pulsar.getBrokerService().updateRates();
+ NonPersistentTopicStats stats = topic.getStats(false, false,
false);
+ NonPersistentPublisherStats npStats =
stats.getPublishers().get(0);
+ NonPersistentSubscriptionStats sub1Stats =
stats.getSubscriptions().get("subscriber-1");
+ NonPersistentSubscriptionStats sub2Stats =
stats.getSubscriptions().get("subscriber-2");
+ assertTrue(npStats.getMsgDropRate() > 0);
+ assertTrue(sub1Stats.getMsgDropRate() > 0);
+ assertTrue(sub2Stats.getMsgDropRate() > 0);
+ });
producer.close();
consumer.close();