This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ac7e9d84bb9cf49ab30e7640c454aad1d7bc7e08 Author: Yunze Xu <[email protected]> AuthorDate: Wed Jul 16 22:33:18 2025 +0800 [improve][test] Add test for concurrent processing of pending read Entries (#24519) (cherry picked from commit 08caff42b71e619b66094df6b3bcaeb1eba1e97a) --- .../impl/cache/PendingReadsManagerTest.java | 50 ++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java index ebf01cba269..9113c123915 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java @@ -32,10 +32,17 @@ import io.opentelemetry.api.OpenTelemetry; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.stream.Collectors; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -47,6 +54,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.tuple.Pair; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.annotations.AfterClass; @@ -60,6 +68,7 @@ public class PendingReadsManagerTest { static final Object CTX = "foo"; static final Object CTX2 = "far"; static final long LEDGER_ID = 123414L; + private final Map<Pair<Long, Long>, AtomicInteger> entryRangeReadCount = new ConcurrentHashMap<>(); ExecutorService orderedExecutor; PendingReadsManagerTest() { @@ -116,6 +125,7 @@ public class PendingReadsManagerTest { ml = mock(ManagedLedgerImpl.class); when(ml.getExecutor()).thenReturn(orderedExecutor); when(rangeEntryCache.getManagedLedger()).thenReturn(ml); + entryRangeReadCount.clear(); } @@ -192,6 +202,8 @@ public class PendingReadsManagerTest { (invocationOnMock -> { log.info("readFromStorage from {} to {} shouldCacheEntry {}", firstEntry, endEntry, shouldCacheEntry); + entryRangeReadCount.computeIfAbsent(Pair.of(firstEntry, endEntry), __ -> new AtomicInteger(0)) + .getAndIncrement(); return read; }) ); @@ -477,4 +489,42 @@ public class PendingReadsManagerTest { } + @Test + public void concurrentReadOnOverlappedEntryRanges() throws Exception { + final var readFutures = new ArrayList<CapturingReadEntriesCallback>(); + final BiConsumer<Long, Long> readEntries = (firstEntry, lastEntry) -> { + final var callback = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntry, lastEntry, false, callback, CTX); + readFutures.add(callback); + }; + final BiFunction<Long, Long, PreparedReadFromStorage> mockReadFromStorage = (firstEntry, lastEntry) -> + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, lastEntry, false); + + final var read0 = mockReadFromStorage.apply(10L, 70L); + readEntries.accept(10L, 70L); + final var read1 = mockReadFromStorage.apply(80L, 100L); + readEntries.accept(80L, 100L); + final var read2 = mockReadFromStorage.apply(71L, 79L); + readEntries.accept(10L, 100L); + + read1.storageReadCompleted(); + readFutures.get(1).get(1, TimeUnit.SECONDS); + assertEquals(readFutures.get(1).getEntries().size(), 21); + + read0.storageReadCompleted(); + readFutures.get(0).get(1, TimeUnit.SECONDS); + assertEquals(readFutures.get(0).getEntries().size(), 61); + + read2.storageReadCompleted(); + readFutures.get(2).get(1, TimeUnit.SECONDS); + assertEquals(readFutures.get(2).getEntries().size(), 91); + + log.info("entryRangeReadCount: {}", entryRangeReadCount); + final var keys = Set.of(Pair.of(10L, 70L), Pair.of(71L, 79L), + Pair.of(80L, 100L)); + assertEquals(entryRangeReadCount.keySet(), keys); + for (final var key : keys) { + assertEquals(entryRangeReadCount.get(key).get(), 1); + } + } }
