This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 893dea9540409b75d253fc490d738acc3d8384b7 Author: Oneby Wang <[email protected]> AuthorDate: Fri May 22 22:10:26 2026 +0800 [fix][test] Fix flaky ResendRequestTest.testSharedSingleAckedPartitionedTopic() test (#25852) (cherry picked from commit 11f7bcdb4ff7fd10d1b278626609c7bd07cc9c2f) (cherry picked from commit 4a5bf97af4066c99050e1fd4c988ea3854afd6c8) --- .../pulsar/broker/service/ResendRequestTest.java | 60 ++++++++++++---------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java index fa1dd2489b7..df49b75ef36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -35,6 +36,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -488,36 +490,41 @@ public class ResendRequestTest extends BrokerTestBase { // Special step to create partitioned topic // 1. producer connect + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); - - // 2. Create consumer - Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) - .receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe(); - - PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0); // Creates new client connection - Consumer<byte[]> consumer2 = newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) - .receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); - // 3. producer publish messages + // 2. producer publish messages for (int i = 0; i < totalMessages; i++) { String message = messagePredicate + i; log.info("Message produced: " + message); producer.send(message.getBytes()); } + // 3. Create consumer + @Cleanup + Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) + .receiverQueueSize(2).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition( + SubscriptionInitialPosition.Earliest).subscribe(); + + @Cleanup + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0); + Consumer<byte[]> consumer2 = newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) + .receiverQueueSize(2).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition( + SubscriptionInitialPosition.Earliest).subscribe(); + // 4. Receive messages - // Use timeouts on the initial receives — with a Shared subscription the broker may - // dispatch all messages to a single consumer (receiverQueueSize is larger than the - // number of messages per partition), and a blocking receive() would hang. - Message<byte[]> message1 = consumer1.receive(5000, TimeUnit.MILLISECONDS); - Message<byte[]> message2 = consumer2.receive(5000, TimeUnit.MILLISECONDS); int messageCount1 = 0; int messageCount2 = 0; int ackCount1 = 0; int ackCount2 = 0; - do { + while (true) { + Message<byte[]> message1 = consumer1.receive(500, TimeUnit.MILLISECONDS); + Message<byte[]> message2 = consumer2.receive(500, TimeUnit.MILLISECONDS); + if (message1 == null && message2 == null) { + break; + } if (message1 != null) { log.info("Consumer1 received " + new String(message1.getData())); messageCount1 += 1; @@ -536,9 +543,7 @@ public class ResendRequestTest extends BrokerTestBase { ackCount2 += 1; } } - message1 = consumer1.receive(500, TimeUnit.MILLISECONDS); - message2 = consumer2.receive(500, TimeUnit.MILLISECONDS); - } while (message1 != null || message2 != null); + } log.info(key + " messageCount1 = " + messageCount1); log.info(key + " messageCount2 = " + messageCount2); log.info(key + " ackCount1 = " + ackCount1); @@ -553,10 +558,13 @@ public class ResendRequestTest extends BrokerTestBase { } // 6. Check if Messages redelivered again - message1 = consumer1.receive(5000, TimeUnit.MILLISECONDS); - message2 = consumer2.receive(5000, TimeUnit.MILLISECONDS); messageCount1 = 0; - do { + while (true) { + Message<byte[]> message1 = consumer1.receive(500, TimeUnit.MILLISECONDS); + Message<byte[]> message2 = consumer2.receive(500, TimeUnit.MILLISECONDS); + if (message1 == null && message2 == null) { + break; + } if (message1 != null) { log.info("Consumer1 received " + new String(message1.getData())); messageCount1 += 1; @@ -565,15 +573,11 @@ public class ResendRequestTest extends BrokerTestBase { log.info("Consumer2 received " + new String(message2.getData())); messageCount2 += 1; } - message1 = consumer1.receive(1000, TimeUnit.MILLISECONDS); - message2 = consumer2.receive(1000, TimeUnit.MILLISECONDS); - } while (message1 != null || message2 != null); - + } log.info(key + " messageCount1 = " + messageCount1); log.info(key + " messageCount2 = " + messageCount2); log.info(key + " ackCount1 = " + ackCount1); log.info(key + " ackCount2 = " + ackCount2); - newPulsarClient.close(); assertEquals(messageCount1 + messageCount2 + ackCount1, totalMessages); }
