This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push: new bfd53f4cbfc [fix][test] Adjust flaky test concurrent consume reconnect (#15544) bfd53f4cbfc is described below commit bfd53f4cbfce4880814389ff89ed85c21af224af Author: ran <gaoran...@126.com> AuthorDate: Wed May 11 20:41:32 2022 +0800 [fix][test] Adjust flaky test concurrent consume reconnect (#15544) (cherry picked from commit 7070397649bc41fcd5e9070695ab2790228aa9c2) --- .../apache/pulsar/client/api/SimpleProducerConsumerTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 8a47c6fbb62..24302a16f8e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -820,10 +820,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { // This is to test that the flow control counter doesn't get corrupted while concurrent receives during // reconnections - @Test(dataProvider = "batch", groups = "quarantine") + @Test(timeOut = 100_000, dataProvider = "batch", groups = "quarantine") public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs) throws Exception { final int recvQueueSize = 100; final int numConsumersThreads = 10; + final int receiveTimeoutSeconds = 100; String subName = UUID.randomUUID().toString(); final Consumer<byte[]> consumer = pulsarClient.newConsumer() @@ -837,12 +838,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { for (int i = 0; i < numConsumersThreads; i++) { executor.submit((Callable<Void>) () -> { barrier.await(); - consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + consumer.receive(receiveTimeoutSeconds, TimeUnit.SECONDS); return null; }); } - - barrier.await(); + barrier.await(); // the last thread reach barrier, start consume messages // we restart the broker to reconnect restartBroker(); @@ -878,7 +878,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { return null; }); } - barrier.await(); + barrier.await(); // the last thread reach barrier, start consume messages Awaitility.await().untilAsserted(() -> { // The available permits should be 20 and num messages in the queue should be 80 @@ -908,7 +908,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { return null; }); } - barrier.await(); + barrier.await(); // the last thread reach barrier, start consume messages restartBroker();