This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1ca17972459 [fix][test]fix flaky
SimpleProducerConsumerTest.testReceiveAsyncCompletedWhenClosing (#24858)
1ca17972459 is described below
commit 1ca17972459095278e2b5f7ed7fd55c8921d8826
Author: Ruimin MA <[email protected]>
AuthorDate: Tue Oct 28 09:17:19 2025 +0800
[fix][test]fix flaky
SimpleProducerConsumerTest.testReceiveAsyncCompletedWhenClosing (#24858)
---
.../client/api/SimpleProducerConsumerTest.java | 25 ++++++++++++++++------
1 file changed, 19 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 8a2ae2488e7..39f5a62306e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4043,14 +4043,18 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
// 1) Test receiveAsync is interrupted
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() -> {
+ CountDownLatch subCountDownLatch = new CountDownLatch(1);
try {
new Thread(() -> {
try {
+ subCountDownLatch.await();
consumer.close();
- } catch (PulsarClientException ignore) {
+ } catch (PulsarClientException | InterruptedException
ignore) {
}
}).start();
- consumer.receiveAsync().get();
+ CompletableFuture<Message<String>> futhre =
consumer.receiveAsync();
+ subCountDownLatch.countDown();
+ futhre.get();
Assert.fail("should be interrupted");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(errorMsg));
@@ -4067,13 +4071,17 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
.batchReceivePolicy(batchReceivePolicy).subscribe();
new Thread(() -> {
try {
+ CountDownLatch subCountDownLatch = new CountDownLatch(1);
new Thread(() -> {
try {
+ subCountDownLatch.await();
consumer2.close();
- } catch (PulsarClientException ignore) {
+ } catch (PulsarClientException | InterruptedException
ignore) {
}
}).start();
- consumer2.batchReceiveAsync().get();
+ CompletableFuture<Messages<String>> future =
consumer2.batchReceiveAsync();
+ subCountDownLatch.countDown();
+ future.get();
Assert.fail("should be interrupted");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(errorMsg));
@@ -4090,13 +4098,18 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
.batchReceivePolicy(batchReceivePolicy).subscribe();
new Thread(() -> {
try {
+ CountDownLatch subCountDownLatch = new CountDownLatch(1);
new Thread(() -> {
try {
+ subCountDownLatch.await();
partitionedTopicConsumer.close();
- } catch (PulsarClientException ignore) {
+ } catch (PulsarClientException | InterruptedException
ignore) {
}
}).start();
- partitionedTopicConsumer.batchReceiveAsync().get();
+ CompletableFuture<Messages<String>> future =
+ partitionedTopicConsumer.batchReceiveAsync();
+ subCountDownLatch.countDown();
+ future.get();
Assert.fail("should be interrupted");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(errorMsg));