This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 36d0c1c1091 [fix][test] Fix flaky
PersistentStickyKeyDispatcherMultipleConsumersTest.testSkipRedeliverTemporally
(#25843)
36d0c1c1091 is described below
commit 36d0c1c109186f9e33c94f301de8b5687a6f98de
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 20 19:47:29 2026 +0300
[fix][test] Fix flaky
PersistentStickyKeyDispatcherMultipleConsumersTest.testSkipRedeliverTemporally
(#25843)
---
...sistentStickyKeyDispatcherMultipleConsumersTest.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 26e16616d1a..81bf59fe373 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -407,13 +407,23 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
return Collections.emptySet();
}).when(cursorMock).asyncReplayEntries(anySet(), any(), any(),
anyBoolean());
+ // Simulate real cursor behavior: track read position so entries are
only returned once
+ // by normal reads (subsequent access must go through
asyncReplayEntries).
+ // When no new entries are available, don't call the callback
(simulating "OrWait" behavior).
+ Set<Position> normalReadReturned = new ConcurrentSkipListSet<>();
doAnswer(invocationOnMock -> {
int maxEntries = invocationOnMock.getArgument(0);
AsyncCallbacks.ReadEntriesCallback callback =
invocationOnMock.getArgument(2);
List<Entry> entries = allEntries.stream()
- .filter(entry -> entry.getLedgerId() != -1 &&
!alreadySent.contains(entry.getPosition()))
+ .filter(entry -> entry.getLedgerId() != -1
+ &&
!normalReadReturned.contains(entry.getPosition()))
.limit(maxEntries)
.toList();
+ if (entries.isEmpty()) {
+ // No new entries available - simulate "wait" by not calling
callback
+ return null;
+ }
+ entries.forEach(e -> normalReadReturned.add(e.getPosition()));
Object ctx = invocationOnMock.getArgument(3);
callback.readEntriesComplete(copyEntries(entries), ctx);
return null;
@@ -457,6 +467,11 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
// set permits to 2
slowConsumerAvailablePermits.set(2);
+ // Trigger a new read cycle so the dispatcher can do a replay read to
deliver
+ // messages to the slow consumer. In production, this would be
triggered by
+ // consumerFlow when the consumer sends more permits.
+ persistentDispatcher.readMoreEntriesAsync();
+
// now wait for slow consumer messages since there are permits
assertTrue(slowConsumerMessagesSent.await(5, TimeUnit.SECONDS));