This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 893dea9540409b75d253fc490d738acc3d8384b7
Author: Oneby Wang <[email protected]>
AuthorDate: Fri May 22 22:10:26 2026 +0800

    [fix][test] Fix flaky 
ResendRequestTest.testSharedSingleAckedPartitionedTopic() test (#25852)
    
    (cherry picked from commit 11f7bcdb4ff7fd10d1b278626609c7bd07cc9c2f)
    (cherry picked from commit 4a5bf97af4066c99050e1fd4c988ea3854afd6c8)
---
 .../pulsar/broker/service/ResendRequestTest.java   | 60 ++++++++++++----------
 1 file changed, 32 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index fa1dd2489b7..df49b75ef36 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -35,6 +36,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -488,36 +490,41 @@ public class ResendRequestTest extends BrokerTestBase {
         // Special step to create partitioned topic
 
         // 1. producer connect
+        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-            .enableBatching(false)
-            
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
-
-        // 2. Create consumer
-        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-                
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
-
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
-        Consumer<byte[]> consumer2 = 
newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-                
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
+                .enableBatching(false)
+                
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
-        // 3. producer publish messages
+        // 2. producer publish messages
         for (int i = 0; i < totalMessages; i++) {
             String message = messagePredicate + i;
             log.info("Message produced: " + message);
             producer.send(message.getBytes());
         }
 
+        // 3. Create consumer
+        @Cleanup
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                
.receiverQueueSize(2).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(
+                        SubscriptionInitialPosition.Earliest).subscribe();
+
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);
+        Consumer<byte[]> consumer2 = 
newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                
.receiverQueueSize(2).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(
+                        SubscriptionInitialPosition.Earliest).subscribe();
+
         // 4. Receive messages
-        // Use timeouts on the initial receives — with a Shared subscription 
the broker may
-        // dispatch all messages to a single consumer (receiverQueueSize is 
larger than the
-        // number of messages per partition), and a blocking receive() would 
hang.
-        Message<byte[]> message1 = consumer1.receive(5000, 
TimeUnit.MILLISECONDS);
-        Message<byte[]> message2 = consumer2.receive(5000, 
TimeUnit.MILLISECONDS);
         int messageCount1 = 0;
         int messageCount2 = 0;
         int ackCount1 = 0;
         int ackCount2 = 0;
-        do {
+        while (true) {
+            Message<byte[]> message1 = consumer1.receive(500, 
TimeUnit.MILLISECONDS);
+            Message<byte[]> message2 = consumer2.receive(500, 
TimeUnit.MILLISECONDS);
+            if (message1 == null && message2 == null) {
+                break;
+            }
             if (message1 != null) {
                 log.info("Consumer1 received " + new 
String(message1.getData()));
                 messageCount1 += 1;
@@ -536,9 +543,7 @@ public class ResendRequestTest extends BrokerTestBase {
                     ackCount2 += 1;
                 }
             }
-            message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
-            message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
-        } while (message1 != null || message2 != null);
+        }
         log.info(key + " messageCount1 = " + messageCount1);
         log.info(key + " messageCount2 = " + messageCount2);
         log.info(key + " ackCount1 = " + ackCount1);
@@ -553,10 +558,13 @@ public class ResendRequestTest extends BrokerTestBase {
         }
 
         // 6. Check if Messages redelivered again
-        message1 = consumer1.receive(5000, TimeUnit.MILLISECONDS);
-        message2 = consumer2.receive(5000, TimeUnit.MILLISECONDS);
         messageCount1 = 0;
-        do {
+        while (true) {
+            Message<byte[]> message1 = consumer1.receive(500, 
TimeUnit.MILLISECONDS);
+            Message<byte[]> message2 = consumer2.receive(500, 
TimeUnit.MILLISECONDS);
+            if (message1 == null && message2 == null) {
+                break;
+            }
             if (message1 != null) {
                 log.info("Consumer1 received " + new 
String(message1.getData()));
                 messageCount1 += 1;
@@ -565,15 +573,11 @@ public class ResendRequestTest extends BrokerTestBase {
                 log.info("Consumer2 received " + new 
String(message2.getData()));
                 messageCount2 += 1;
             }
-            message1 = consumer1.receive(1000, TimeUnit.MILLISECONDS);
-            message2 = consumer2.receive(1000, TimeUnit.MILLISECONDS);
-        } while (message1 != null || message2 != null);
-
+        }
         log.info(key + " messageCount1 = " + messageCount1);
         log.info(key + " messageCount2 = " + messageCount2);
         log.info(key + " ackCount1 = " + ackCount1);
         log.info(key + " ackCount2 = " + ackCount2);
-        newPulsarClient.close();
         assertEquals(messageCount1 + messageCount2 + ackCount1, totalMessages);
     }
 

Reply via email to