This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 36d0c1c1091 [fix][test] Fix flaky 
PersistentStickyKeyDispatcherMultipleConsumersTest.testSkipRedeliverTemporally 
(#25843)
36d0c1c1091 is described below

commit 36d0c1c109186f9e33c94f301de8b5687a6f98de
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 20 19:47:29 2026 +0300

    [fix][test] Fix flaky 
PersistentStickyKeyDispatcherMultipleConsumersTest.testSkipRedeliverTemporally 
(#25843)
---
 ...sistentStickyKeyDispatcherMultipleConsumersTest.java | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 26e16616d1a..81bf59fe373 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -407,13 +407,23 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
             return Collections.emptySet();
         }).when(cursorMock).asyncReplayEntries(anySet(), any(), any(), 
anyBoolean());
 
+        // Simulate real cursor behavior: track read position so entries are 
only returned once
+        // by normal reads (subsequent access must go through 
asyncReplayEntries).
+        // When no new entries are available, don't call the callback 
(simulating "OrWait" behavior).
+        Set<Position> normalReadReturned = new ConcurrentSkipListSet<>();
         doAnswer(invocationOnMock -> {
             int maxEntries = invocationOnMock.getArgument(0);
             AsyncCallbacks.ReadEntriesCallback callback = 
invocationOnMock.getArgument(2);
             List<Entry> entries = allEntries.stream()
-                    .filter(entry -> entry.getLedgerId() != -1 && 
!alreadySent.contains(entry.getPosition()))
+                    .filter(entry -> entry.getLedgerId() != -1
+                            && 
!normalReadReturned.contains(entry.getPosition()))
                     .limit(maxEntries)
                     .toList();
+            if (entries.isEmpty()) {
+                // No new entries available - simulate "wait" by not calling 
callback
+                return null;
+            }
+            entries.forEach(e -> normalReadReturned.add(e.getPosition()));
             Object ctx = invocationOnMock.getArgument(3);
             callback.readEntriesComplete(copyEntries(entries), ctx);
             return null;
@@ -457,6 +467,11 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         // set permits to 2
         slowConsumerAvailablePermits.set(2);
 
+        // Trigger a new read cycle so the dispatcher can do a replay read to 
deliver
+        // messages to the slow consumer. In production, this would be 
triggered by
+        // consumerFlow when the consumer sends more permits.
+        persistentDispatcher.readMoreEntriesAsync();
+
         // now wait for slow consumer messages since there are permits
         assertTrue(slowConsumerMessagesSent.await(5, TimeUnit.SECONDS));
 

Reply via email to