This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ac7e9d84bb9cf49ab30e7640c454aad1d7bc7e08
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jul 16 22:33:18 2025 +0800

    [improve][test] Add test for concurrent processing of pending read Entries 
(#24519)
    
    (cherry picked from commit 08caff42b71e619b66094df6b3bcaeb1eba1e97a)
---
 .../impl/cache/PendingReadsManagerTest.java        | 50 ++++++++++++++++++++++
 1 file changed, 50 insertions(+)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
index ebf01cba269..9113c123915 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
@@ -32,10 +32,17 @@ import io.opentelemetry.api.OpenTelemetry;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -47,6 +54,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.tuple.Pair;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.AfterClass;
@@ -60,6 +68,7 @@ public class PendingReadsManagerTest  {
     static final Object CTX = "foo";
     static final Object CTX2 = "far";
     static final long LEDGER_ID = 123414L;
+    private final Map<Pair<Long, Long>, AtomicInteger> entryRangeReadCount = 
new ConcurrentHashMap<>();
     ExecutorService orderedExecutor;
 
     PendingReadsManagerTest() {
@@ -116,6 +125,7 @@ public class PendingReadsManagerTest  {
         ml = mock(ManagedLedgerImpl.class);
         when(ml.getExecutor()).thenReturn(orderedExecutor);
         when(rangeEntryCache.getManagedLedger()).thenReturn(ml);
+        entryRangeReadCount.clear();
     }
 
 
@@ -192,6 +202,8 @@ public class PendingReadsManagerTest  {
                 (invocationOnMock -> {
                     log.info("readFromStorage from {} to {} shouldCacheEntry 
{}", firstEntry, endEntry,
                             shouldCacheEntry);
+                    entryRangeReadCount.computeIfAbsent(Pair.of(firstEntry, 
endEntry), __ -> new AtomicInteger(0))
+                            .getAndIncrement();
                     return read;
                 })
         );
@@ -477,4 +489,42 @@ public class PendingReadsManagerTest  {
 
     }
 
+    @Test
+    public void concurrentReadOnOverlappedEntryRanges() throws Exception {
+        final var readFutures = new ArrayList<CapturingReadEntriesCallback>();
+        final BiConsumer<Long, Long> readEntries = (firstEntry, lastEntry) -> {
+            final var callback = new CapturingReadEntriesCallback();
+            pendingReadsManager.readEntries(lh, firstEntry, lastEntry, false, 
callback, CTX);
+            readFutures.add(callback);
+        };
+        final BiFunction<Long, Long, PreparedReadFromStorage> 
mockReadFromStorage = (firstEntry, lastEntry) ->
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntry, 
lastEntry, false);
+
+        final var read0 = mockReadFromStorage.apply(10L, 70L);
+        readEntries.accept(10L, 70L);
+        final var read1 = mockReadFromStorage.apply(80L, 100L);
+        readEntries.accept(80L, 100L);
+        final var read2 = mockReadFromStorage.apply(71L, 79L);
+        readEntries.accept(10L, 100L);
+
+        read1.storageReadCompleted();
+        readFutures.get(1).get(1, TimeUnit.SECONDS);
+        assertEquals(readFutures.get(1).getEntries().size(), 21);
+
+        read0.storageReadCompleted();
+        readFutures.get(0).get(1, TimeUnit.SECONDS);
+        assertEquals(readFutures.get(0).getEntries().size(), 61);
+
+        read2.storageReadCompleted();
+        readFutures.get(2).get(1, TimeUnit.SECONDS);
+        assertEquals(readFutures.get(2).getEntries().size(), 91);
+
+        log.info("entryRangeReadCount: {}", entryRangeReadCount);
+        final var keys = Set.of(Pair.of(10L, 70L), Pair.of(71L, 79L),
+                Pair.of(80L, 100L));
+        assertEquals(entryRangeReadCount.keySet(), keys);
+        for (final var key : keys) {
+            assertEquals(entryRangeReadCount.get(key).get(), 1);
+        }
+    }
 }

Reply via email to