This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 11f7bcdb4ff [fix][test] Fix flaky 
ResendRequestTest.testSharedSingleAckedPartitionedTopic() test (#25852)
11f7bcdb4ff is described below

commit 11f7bcdb4ff7fd10d1b278626609c7bd07cc9c2f
Author: Oneby Wang <[email protected]>
AuthorDate: Fri May 22 22:10:26 2026 +0800

    [fix][test] Fix flaky 
ResendRequestTest.testSharedSingleAckedPartitionedTopic() test (#25852)
---
 .../pulsar/broker/service/ResendRequestTest.java   | 59 ++++++++++++----------
 1 file changed, 31 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index 4292b9236c8..4f31d4c8701 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
@@ -492,37 +493,41 @@ public class ResendRequestTest extends 
SharedPulsarBaseTest {
         Random rn = new Random();
 
         // 1. producer connect
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-            .enableBatching(false)
-            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
-
-        // 2. Create consumer
-        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-                
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
-
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient();
-        Consumer<byte[]> consumer2 = 
newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-                
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
-        // 3. producer publish messages
+        // 2. producer publish messages
         for (int i = 0; i < totalMessages; i++) {
             String message = messagePredicate + i;
             log.info().attr("message", message).log("Message produced");
             producer.send(message.getBytes());
         }
 
+        // 3. Create consumer
+        @Cleanup
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                
.receiverQueueSize(2).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(
+                        SubscriptionInitialPosition.Earliest).subscribe();
+
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient();
+        Consumer<byte[]> consumer2 = 
newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                
.receiverQueueSize(2).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(
+                        SubscriptionInitialPosition.Earliest).subscribe();
+
         // 4. Receive messages
-        // Use timeouts on the initial receives — with a Shared subscription 
the broker may
-        // dispatch all messages to a single consumer (receiverQueueSize is 
larger than the
-        // number of messages per partition), and a blocking receive() would 
hang.
-        Message<byte[]> message1 = consumer1.receive(5000, 
TimeUnit.MILLISECONDS);
-        Message<byte[]> message2 = consumer2.receive(5000, 
TimeUnit.MILLISECONDS);
         int messageCount1 = 0;
         int messageCount2 = 0;
         int ackCount1 = 0;
         int ackCount2 = 0;
-        do {
+        while (true) {
+            Message<byte[]> message1 = consumer1.receive(500, 
TimeUnit.MILLISECONDS);
+            Message<byte[]> message2 = consumer2.receive(500, 
TimeUnit.MILLISECONDS);
+            if (message1 == null && message2 == null) {
+                break;
+            }
             if (message1 != null) {
                 log.info().attr("data", new 
String(message1.getData())).log("Consumer1 received");
                 messageCount1 += 1;
@@ -541,9 +546,7 @@ public class ResendRequestTest extends SharedPulsarBaseTest 
{
                     ackCount2 += 1;
                 }
             }
-            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
-            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
-        } while (message1 != null || message2 != null);
+        }
         log.info().attr("messageCount1", messageCount1).log("messageCount1");
         log.info().attr("messageCount2", messageCount2).log("messageCount2");
         log.info().attr("ackCount1", ackCount1).log("ackCount1");
@@ -558,10 +561,13 @@ public class ResendRequestTest extends 
SharedPulsarBaseTest {
         }
 
         // 6. Check if Messages redelivered again
-        message1 = consumer1.receive(5000, TimeUnit.MILLISECONDS);
-        message2 = consumer2.receive(5000, TimeUnit.MILLISECONDS);
         messageCount1 = 0;
-        do {
+        while (true) {
+            Message<byte[]> message1 = consumer1.receive(500, 
TimeUnit.MILLISECONDS);
+            Message<byte[]> message2 = consumer2.receive(500, 
TimeUnit.MILLISECONDS);
+            if (message1 == null && message2 == null) {
+                break;
+            }
             if (message1 != null) {
                 log.info().attr("data", new 
String(message1.getData())).log("Consumer1 received");
                 messageCount1 += 1;
@@ -570,10 +576,7 @@ public class ResendRequestTest extends 
SharedPulsarBaseTest {
                 log.info().attr("data", new 
String(message2.getData())).log("Consumer2 received");
                 messageCount2 += 1;
             }
-            message1 = consumer1.receive(1000, TimeUnit.MILLISECONDS);
-            message2 = consumer2.receive(1000, TimeUnit.MILLISECONDS);
-        } while (message1 != null || message2 != null);
-
+        }
         log.info().attr("messageCount1", messageCount1).log("messageCount1");
         log.info().attr("messageCount2", messageCount2).log("messageCount2");
         log.info().attr("ackCount1", ackCount1).log("ackCount1");

Reply via email to