This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 39f14bd1b5bdf4c0c0e2916d07234f6c0f51ee36 Author: Jiwei Guo <[email protected]> AuthorDate: Fri Sep 16 15:51:40 2022 +0800 [fix][test] Fix flaky test ResourceGroupUsageAggregationTest. testProduceConsumeUsageOnRG (#17617) --- .../broker/resourcegroup/ResourceGroupService.java | 11 +++ .../ResourceGroupUsageAggregationTest.java | 105 ++++++++++++--------- 2 files changed, 70 insertions(+), 46 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index 79cba28d374..4bb1bc8ab24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.resourcegroup; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import com.google.common.annotations.VisibleForTesting; import io.prometheus.client.Counter; import io.prometheus.client.Summary; import java.util.Map; @@ -829,4 +830,14 @@ public class ResourceGroupService { .name("pulsar_resource_group_calculate_quota_secs") .help("Time required to calculate quota of all resource groups, in seconds.") .register(); + + @VisibleForTesting + ConcurrentHashMap getTopicConsumeStats() { + return this.topicConsumeStats; + } + + @VisibleForTesting + ConcurrentHashMap getTopicProduceStats() { + return this.topicProduceStats; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java index a86035b71e0..1fd9271d9bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java @@ -24,6 +24,8 @@ import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCoun import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -37,16 +39,16 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; - -import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @Slf4j -@Test(groups = "flaky") public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase { @BeforeClass @Override @@ -120,10 +122,11 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase { .create(); Consumer<byte[]> consumer = null; + String subscriptionName = "my-subscription"; try { consumer = pulsarClient.newConsumer() .topic(topicString) - .subscriptionName("my-subscription") + .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) .subscribe(); } catch (PulsarClientException p) { @@ -176,6 +179,20 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase { true, true); consumer.close(); + // cleanup the topic data. + CompletableFuture<Optional<Topic>> topicFuture = pulsar.getBrokerService().getTopics().remove(topicString); + if (topicFuture != null) { + Optional<Topic> optTopic = topicFuture.join(); + if (optTopic.isPresent()) { + Topic topic = optTopic.get(); + if (topic instanceof PersistentTopic) { + PersistentTopic persistentTopic = (PersistentTopic) topic; + persistentTopic.getSubscription(subscriptionName).deleteForcefully(); + } + } + } + rgs.getTopicConsumeStats().clear(); + rgs.getTopicProduceStats().clear(); rgs.unRegisterTenant(activeRgName, tenantString); rgs.unRegisterNameSpace(activeRgName, NamespaceName.get(nsString)); @@ -192,48 +209,44 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase { boolean checkProduce, boolean checkConsume) throws InterruptedException, PulsarAdminException { BrokerService bs = pulsar.getBrokerService(); - Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats(); - for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) { - String mapTopicName = entry.getKey(); - if (mapTopicName.equals(topicString)) { - TopicStatsImpl stats = entry.getValue(); - if (checkProduce) { - Assert.assertTrue(stats.bytesInCounter >= sentNumBytes); - Assert.assertEquals(sentNumMsgs, stats.msgInCounter); - } - if (checkConsume) { - Assert.assertTrue(stats.bytesOutCounter >= recvdNumBytes); - Assert.assertEquals(recvdNumMsgs, stats.msgOutCounter); - } - - if (sentNumMsgs > 0 || recvdNumMsgs > 0) { - rgs.aggregateResourceGroupLocalUsages(); // hack to ensure aggregator calculation without waiting - BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish, - ResourceGroupUsageStatsType.Cumulative); - BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch, - ResourceGroupUsageStatsType.Cumulative); - - // Re-do the getRGUsage. - // The counts should be equal, since there wasn't any intervening traffic on TEST_PRODUCE_CONSUME_TOPIC. - BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish, - ResourceGroupUsageStatsType.Cumulative); - BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch, - ResourceGroupUsageStatsType.Cumulative); - - Assert.assertEquals(prodCounts1.bytes, prodCounts.bytes); - Assert.assertEquals(prodCounts1.messages, prodCounts.messages); - Assert.assertEquals(consCounts1.bytes, consCounts.bytes); - Assert.assertEquals(consCounts1.messages, consCounts.messages); - - if (checkProduce) { - Assert.assertTrue(prodCounts.bytes >= sentNumBytes); - Assert.assertEquals(sentNumMsgs, prodCounts.messages); - } - if (checkConsume) { - Assert.assertTrue(consCounts.bytes >= recvdNumBytes); - Assert.assertEquals(recvdNumMsgs, consCounts.messages); - } - } + Awaitility.await().untilAsserted(() -> { + TopicStatsImpl topicStats = bs.getTopicStats().get(topicString); + Assert.assertNotNull(topicStats); + if (checkProduce) { + Assert.assertTrue(topicStats.bytesInCounter >= sentNumBytes); + Assert.assertEquals(sentNumMsgs, topicStats.msgInCounter); + } + if (checkConsume) { + Assert.assertTrue(topicStats.bytesOutCounter >= recvdNumBytes); + Assert.assertEquals(recvdNumMsgs, topicStats.msgOutCounter); + } + }); + if (sentNumMsgs > 0 || recvdNumMsgs > 0) { + rgs.aggregateResourceGroupLocalUsages(); // hack to ensure aggregator calculation without waiting + BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish, + ResourceGroupUsageStatsType.Cumulative); + BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch, + ResourceGroupUsageStatsType.Cumulative); + + // Re-do the getRGUsage. + // The counts should be equal, since there wasn't any intervening traffic on TEST_PRODUCE_CONSUME_TOPIC. + BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish, + ResourceGroupUsageStatsType.Cumulative); + BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch, + ResourceGroupUsageStatsType.Cumulative); + + Assert.assertEquals(prodCounts1.bytes, prodCounts.bytes); + Assert.assertEquals(prodCounts1.messages, prodCounts.messages); + Assert.assertEquals(consCounts1.bytes, consCounts.bytes); + Assert.assertEquals(consCounts1.messages, consCounts.messages); + + if (checkProduce) { + Assert.assertTrue(prodCounts.bytes >= sentNumBytes); + Assert.assertEquals(sentNumMsgs, prodCounts.messages); + } + if (checkConsume) { + Assert.assertTrue(consCounts.bytes >= recvdNumBytes); + Assert.assertEquals(recvdNumMsgs, consCounts.messages); } } }
