This is an automated email from the ASF dual-hosted git repository. jianghaiting pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7be144f51b51b08474b79ec3a3ec4fcc686bc908 Author: fengyubiao <[email protected]> AuthorDate: Fri Aug 19 07:43:47 2022 +0800 [fix][flay-test]BrokerInterceptorTest.testProducerCreation (#17159) (cherry picked from commit 84968e84c109ccc690fdd5779d2f09a50cd9da24) --- .../pulsar/broker/intercept/BrokerInterceptorTest.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java index d0c163f4356..0e46b147e7b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.nar.NarClassLoader; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -111,7 +112,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase { BrokerInterceptor listener = pulsar.getBrokerInterceptor(); Assert.assertTrue(listener instanceof CounterBrokerInterceptor); admin.namespaces().createNamespace("public/test", 4); - Assert.assertTrue(((CounterBrokerInterceptor)listener).getCount() >= 1); + Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getCount() >= 1); } @Test @@ -120,7 +121,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase { Assert.assertTrue(listener instanceof CounterBrokerInterceptor); pulsarClient.newProducer(Schema.BOOL).topic("test").create(); // CONNECT and PRODUCER - Assert.assertTrue(((CounterBrokerInterceptor)listener).getCount() >= 2); + Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getCount() >= 2); } @Test @@ -130,7 +131,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase { pulsarClient.newProducer(Schema.BOOL).topic("test").create(); pulsarClient.newConsumer(Schema.STRING).topic("test1").subscriptionName("test-sub").subscribe(); // single connection for both producer and consumer - Assert.assertTrue(((CounterBrokerInterceptor)listener).getConnectionCreationCount() == 1); + Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConnectionCreationCount() == 1); } @Test @@ -139,7 +140,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase { Assert.assertTrue(listener instanceof CounterBrokerInterceptor); Assert.assertTrue(((CounterBrokerInterceptor)listener).getProducerCount() == 0); pulsarClient.newProducer(Schema.BOOL).topic("test").create(); - Assert.assertTrue(((CounterBrokerInterceptor)listener).getProducerCount() == 1); + Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getProducerCount() == 1); } @Test @@ -148,7 +149,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase { Assert.assertTrue(listener instanceof CounterBrokerInterceptor); Assert.assertTrue(((CounterBrokerInterceptor)listener).getConsumerCount() == 0); pulsarClient.newConsumer(Schema.STRING).topic("test1").subscriptionName("test-sub").subscribe(); - Assert.assertTrue(((CounterBrokerInterceptor)listener).getConsumerCount() == 1); + Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 1); } @Test @@ -175,8 +176,8 @@ public class BrokerInterceptorTest extends ProducerConsumerBase { assertEquals(msg.getValue(), "hello world"); - assertEquals(((CounterBrokerInterceptor) listener).getBeforeSendCount(), 1); - assertEquals(((CounterBrokerInterceptor)listener).getMessageDispatchCount(),1); + Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getBeforeSendCount() == 1); + Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getMessageDispatchCount() == 1); } @Test @@ -205,6 +206,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase { } }); future.get(); + Awaitility.await().until(() -> !interceptor.getResponseList().isEmpty()); CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0); Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000"); Assert.assertEquals(responseEvent.getResponseStatus(),
