This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ca17972459 [fix][test]fix flaky 
SimpleProducerConsumerTest.testReceiveAsyncCompletedWhenClosing (#24858)
1ca17972459 is described below

commit 1ca17972459095278e2b5f7ed7fd55c8921d8826
Author: Ruimin MA <[email protected]>
AuthorDate: Tue Oct 28 09:17:19 2025 +0800

    [fix][test]fix flaky 
SimpleProducerConsumerTest.testReceiveAsyncCompletedWhenClosing (#24858)
---
 .../client/api/SimpleProducerConsumerTest.java     | 25 ++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 8a2ae2488e7..39f5a62306e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4043,14 +4043,18 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         // 1) Test receiveAsync is interrupted
         CountDownLatch countDownLatch = new CountDownLatch(1);
         new Thread(() -> {
+            CountDownLatch subCountDownLatch = new CountDownLatch(1);
             try {
                 new Thread(() -> {
                     try {
+                        subCountDownLatch.await();
                         consumer.close();
-                    } catch (PulsarClientException ignore) {
+                    } catch (PulsarClientException | InterruptedException 
ignore) {
                     }
                 }).start();
-                consumer.receiveAsync().get();
+                CompletableFuture<Message<String>> futhre = 
consumer.receiveAsync();
+                subCountDownLatch.countDown();
+                futhre.get();
                 Assert.fail("should be interrupted");
             } catch (Exception e) {
                 Assert.assertTrue(e.getMessage().contains(errorMsg));
@@ -4067,13 +4071,17 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
                 .batchReceivePolicy(batchReceivePolicy).subscribe();
         new Thread(() -> {
             try {
+                CountDownLatch subCountDownLatch = new CountDownLatch(1);
                 new Thread(() -> {
                     try {
+                        subCountDownLatch.await();
                         consumer2.close();
-                    } catch (PulsarClientException ignore) {
+                    } catch (PulsarClientException | InterruptedException 
ignore) {
                     }
                 }).start();
-                consumer2.batchReceiveAsync().get();
+                CompletableFuture<Messages<String>> future = 
consumer2.batchReceiveAsync();
+                subCountDownLatch.countDown();
+                future.get();
                 Assert.fail("should be interrupted");
             } catch (Exception e) {
                 Assert.assertTrue(e.getMessage().contains(errorMsg));
@@ -4090,13 +4098,18 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
                 .batchReceivePolicy(batchReceivePolicy).subscribe();
         new Thread(() -> {
             try {
+                CountDownLatch subCountDownLatch = new CountDownLatch(1);
                 new Thread(() -> {
                     try {
+                        subCountDownLatch.await();
                         partitionedTopicConsumer.close();
-                    } catch (PulsarClientException ignore) {
+                    } catch (PulsarClientException | InterruptedException 
ignore) {
                     }
                 }).start();
-                partitionedTopicConsumer.batchReceiveAsync().get();
+                CompletableFuture<Messages<String>> future =
+                        partitionedTopicConsumer.batchReceiveAsync();
+                subCountDownLatch.countDown();
+                future.get();
                 Assert.fail("should be interrupted");
             } catch (Exception e) {
                 Assert.assertTrue(e.getMessage().contains(errorMsg));

Reply via email to