This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f1974aa6af5bd32d0f14f7c402d8e2c6f1baae25 Author: Yunze Xu <[email protected]> AuthorDate: Sat Jul 19 12:26:22 2025 +0800 [fix][ml] Fix asyncReadEntries might never complete if empty entries are read from BK (#24515) (cherry picked from commit eea7c13fb75e7acea012fbe7b46be0f4f7558a90) --- .../mledger/impl/ManagedLedgerFactoryImpl.java | 31 ++++-- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 +- .../bookkeeper/mledger/impl/OpReadEntry.java | 104 +++++++++++++++------ .../mledger/impl/cache/RangeEntryCacheImpl.java | 5 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 38 ++++++++ .../bookkeeper/test/MockedBookKeeperTestCase.java | 22 ++++- 6 files changed, 168 insertions(+), 37 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 627e3225519..ac924258556 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.impl; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicates; import com.google.common.collect.BoundType; import com.google.common.collect.Maps; @@ -47,6 +48,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; @@ -108,6 +110,7 @@ import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.api.extended.SessionEvent; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -170,7 +173,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { ManagedLedgerFactoryConfig config) throws Exception { this(metadataStore, new DefaultBkFactory(bkClientConfiguration), - true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop()); + true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop(), null); } public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper) @@ -184,12 +187,21 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { this(metadataStore, (policyConfig) -> CompletableFuture.completedFuture(bookKeeper), config); } + @VisibleForTesting + public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper, + @Nullable Function<ManagedLedgerFactoryImpl, EntryCacheManager> + entryCacheManagerCreator) + throws Exception { + this(metadataStore, __ -> CompletableFuture.completedFuture(bookKeeper), false, + new ManagedLedgerFactoryConfig(), NullStatsLogger.INSTANCE, OpenTelemetry.noop(), + entryCacheManagerCreator); + } + public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, ManagedLedgerFactoryConfig config) throws Exception { - this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */, - config, NullStatsLogger.INSTANCE, OpenTelemetry.noop()); + this(metadataStore, bookKeeperGroupFactory, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop()); } public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, @@ -198,7 +210,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { OpenTelemetry openTelemetry) throws Exception { this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */, - config, statsLogger, openTelemetry); + config, statsLogger, openTelemetry, null); } private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, @@ -206,7 +218,10 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { boolean isBookkeeperManaged, ManagedLedgerFactoryConfig config, StatsLogger statsLogger, - OpenTelemetry openTelemetry) throws Exception { + OpenTelemetry openTelemetry, + @Nullable Function<ManagedLedgerFactoryImpl, EntryCacheManager> + entryCacheManagerCreator) + throws Exception { MetadataCompressionConfig compressionConfigForManagedLedgerInfo = config.getCompressionConfigForManagedLedgerInfo(); MetadataCompressionConfig compressionConfigForManagedCursorInfo = @@ -228,7 +243,11 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { compressionConfigForManagedCursorInfo); this.config = config; this.mbean = new ManagedLedgerFactoryMBeanImpl(this); - this.entryCacheManager = new RangeEntryCacheManagerImpl(this, scheduledExecutor, openTelemetry); + if (entryCacheManagerCreator == null) { + this.entryCacheManager = new RangeEntryCacheManagerImpl(this, scheduledExecutor, openTelemetry); + } else { + this.entryCacheManager = entryCacheManagerCreator.apply(this); + } this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats), 0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS); this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors), diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 2022fd395e1..f98fc8b425d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2155,8 +2155,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // If all messages in [firstEntry...lastEntry] are filter out, // then manual call internalReadEntriesComplete to advance read position. if (firstValidEntry == -1L) { - opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, - PositionFactory.create(ledger.getId(), lastEntry)); + final var nextReadPosition = PositionFactory.create(ledger.getId(), lastEntry).getNext(); + opReadEntry.updateReadPosition(nextReadPosition); + opReadEntry.checkReadCompletion(); return; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index a4928b44bd9..2dcbe50a62c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -21,7 +21,6 @@ package org.apache.bookkeeper.mledger.impl; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.function.Predicate; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; @@ -67,7 +66,12 @@ class OpReadEntry implements ReadEntriesCallback { return op; } - void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, Position lastPosition) { + private void internalReadEntriesComplete(List<Entry> returnedEntries) { + if (returnedEntries.isEmpty()) { + log.warn("[{}] Read no entries unexpectedly", this); + checkReadCompletion(); + return; + } // Filter the returned entries for individual deleted messages int entriesCount = returnedEntries.size(); long entriesSize = 0; @@ -76,19 +80,15 @@ class OpReadEntry implements ReadEntriesCallback { } cursor.updateReadStats(entriesCount, entriesSize); - if (entriesCount != 0) { - lastPosition = returnedEntries.get(entriesCount - 1).getPosition(); - } if (log.isDebugEnabled()) { log.debug("[{}][{}] Read entries succeeded batch_size={} cumulative_size={} requested_count={}", cursor.ledger.getName(), cursor.getName(), returnedEntries.size(), entries.size(), count); } - List<Entry> filteredEntries = Collections.emptyList(); - if (entriesCount != 0) { - filteredEntries = cursor.filterReadEntries(returnedEntries); - entries.addAll(filteredEntries); - } + // Entries might be released after `filterReadEntries`, so retrieve the last position before that + final var lastPosition = returnedEntries.get(entriesCount - 1).getPosition(); + final var filteredEntries = cursor.filterReadEntries(returnedEntries); + entries.addAll(filteredEntries); // if entries have been filtered out then try to skip reading of already deletedMessages in that range final Position nexReadPosition = entriesCount != filteredEntries.size() @@ -99,19 +99,30 @@ class OpReadEntry implements ReadEntriesCallback { @Override public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) { - internalReadEntriesComplete(returnedEntries, ctx, null); + try { + internalReadEntriesComplete(returnedEntries); + } catch (Throwable throwable) { + log.error("[{}] Fallback to readEntriesFailed for exception in readEntriesComplete", this, throwable); + readEntriesFailed(ManagedLedgerException.getManagedLedgerException(throwable), ctx); + } } @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + try { + internalReadEntriesFailed(exception, ctx); + } catch (Throwable throwable) { + // At least we should complete the callback + fail(ManagedLedgerException.getManagedLedgerException(throwable), ctx); + } + } + + private void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) { cursor.readOperationCompleted(); if (!entries.isEmpty()) { // There were already some entries that were read before, we can return them - cursor.ledger.getExecutor().execute(() -> { - callback.readEntriesComplete(entries, ctx); - recycle(); - }); + complete(ctx); } else if (cursor.getConfig().isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) { log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(), @@ -129,9 +140,7 @@ class OpReadEntry implements ReadEntriesCallback { } // fail callback if it couldn't find next valid ledger if (nexReadPosition == null) { - callback.readEntriesFailed(exception, ctx); - cursor.ledger.mbean.recordReadEntriesError(); - recycle(); + fail(exception, ctx); return; } updateReadPosition(nexReadPosition); @@ -152,9 +161,7 @@ class OpReadEntry implements ReadEntriesCallback { } } - callback.readEntriesFailed(exception, ctx); - cursor.ledger.mbean.recordReadEntriesError(); - recycle(); + fail(exception, ctx); } } @@ -177,12 +184,8 @@ class OpReadEntry implements ReadEntriesCallback { // The reading was already completed, release resources and trigger callback try { cursor.readOperationCompleted(); - } finally { - cursor.ledger.getExecutor().execute(() -> { - callback.readEntriesComplete(entries, ctx); - recycle(); - }); + complete(ctx); } } } @@ -217,5 +220,54 @@ class OpReadEntry implements ReadEntriesCallback { recyclerHandle.recycle(this); } + private void complete(Object ctx) { + cursor.ledger.getExecutor().execute(() -> { + try { + callback.readEntriesComplete(entries, ctx); + recycle(); + } catch (Throwable throwable) { + log.error("[{}] readEntriesComplete failed (last position: {})", this, lastEntryPosition(), throwable); + } + }); + } + + private void fail(ManagedLedgerException e, Object ctx) { + try { + callback.readEntriesFailed(e, ctx); + cursor.ledger.mbean.recordReadEntriesError(); + recycle(); + } catch (Throwable throwable) { + log.error("[{}] readEntriesFailed failed (exception: {})", this, e.getMessage(), throwable); + } + } + + @Override + public String toString() { + final var cursor = this.cursor; + final var readPosition = this.readPosition; + final var nextReadPosition = this.nextReadPosition; + final var entries = this.entries; + final var maxPosition = this.maxPosition; + final var count = this.count; + if (cursor != null) { + return cursor.ledger.getName() + " " + cursor.getName() + "{ readPosition: " + + (readPosition != null ? readPosition : "(null)") + ", nextReadPosition: " + + (nextReadPosition != null ? nextReadPosition : "(null)") + ", maxPosition: " + + (maxPosition != null ? maxPosition : "(null)") + ", entries count: " + + (entries != null ? entries.size() : "(null)") + ", count: " + count + " }"; + } else { + return "(null)"; + } + } + + private String lastEntryPosition() { + final var entries = this.entries; + if (entries != null) { + return entries.isEmpty() ? "(empty)" : entries.get(entries.size() - 1).getPosition().toString(); + } else { + return "(null)"; + } + } + private static final Logger log = LoggerFactory.getLogger(OpReadEntry.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index ad5630c078a..d4fbeec9617 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -440,8 +440,9 @@ public class RangeEntryCacheImpl implements EntryCache { * @param shouldCacheEntry if we should put the entry into the cache * @return a handle to the operation */ - CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, - long firstEntry, long lastEntry, boolean shouldCacheEntry) { + @VisibleForTesting + protected CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, + boolean shouldCacheEntry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; CompletableFuture<List<EntryImpl>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry) .thenApply( diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 34b67c5d020..b900ed1ecf8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -71,6 +71,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; import lombok.Cleanup; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.BKException; @@ -99,6 +100,8 @@ import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.ScanOutcome; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; +import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; +import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; @@ -121,6 +124,7 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -133,6 +137,10 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } + @AfterMethod + public void afterMethod() { + setEntryCacheCreator(null); + } @Test public void testCloseCursor() throws Exception { @@ -5402,6 +5410,36 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertTrue(recovered.booleanValue()); } + @Test(timeOut = 10000) + public void testReadNoEntries() throws Exception { + final var firstRead = new AtomicBoolean(true); + setEntryCacheCreator(ml -> new RangeEntryCacheImpl((RangeEntryCacheManagerImpl) factory.getEntryCacheManager(), + ml, factory.getConfig().isCopyEntriesInCache()) { + + @Override + protected CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, + boolean shouldCacheEntry) { + if (firstRead.compareAndSet(true, false)) { + return CompletableFuture.completedFuture(List.of()); + } + return super.readFromStorage(lh, firstEntry, lastEntry, shouldCacheEntry); + } + }); + final var ml = factory.open("testReadNoEntries"); + final var cursor = ml.openCursor("cursor"); + cursor.setInactive(); // disable caching when adding entries + for (int i = 0; i < 10; i++) { + ml.addEntry(("msg-" + i).getBytes(StandardCharsets.UTF_8)); + } + final var entries = cursor.readEntries(10); + assertEquals(entries.stream().map(e -> { + final var buffer = e.getDataBuffer(); + final var bytes = new byte[buffer.readableBytes()]; + buffer.readBytes(bytes); + return new String(bytes, StandardCharsets.UTF_8); + }).toList(), IntStream.range(0, 10).mapToObj(i -> "msg-" + i).toList()); + } + class TestPulsarMockBookKeeper extends PulsarMockBookKeeper { Map<Long, Integer> ledgerErrors = new HashMap<>(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index 178a9cf984d..a23fbea96a6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -18,17 +18,22 @@ */ package org.apache.bookkeeper.test; +import io.opentelemetry.api.OpenTelemetry; import java.lang.reflect.Method; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import lombok.SneakyThrows; import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.cache.EntryCache; +import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -57,6 +62,7 @@ public abstract class MockedBookKeeperTestCase { protected ExecutorService cachedExecutor; protected FaultInjectionMetadataStore metadataStore; + private Function<ManagedLedgerImpl, EntryCache> entryCacheCreator = null; public MockedBookKeeperTestCase() { // By default start a 3 bookies cluster @@ -84,7 +90,17 @@ public abstract class MockedBookKeeperTestCase { ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); initManagedLedgerFactoryConfig(managedLedgerFactoryConfig); - factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); + factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, __ -> new RangeEntryCacheManagerImpl(__, + __.getScheduledExecutor(), OpenTelemetry.noop()) { + + @Override + public EntryCache getEntryCache(ManagedLedgerImpl ml) { + if (entryCacheCreator != null) { + return entryCacheCreator.apply(ml); + } + return super.getEntryCache(ml); + } + }); setUpTestCase(); } @@ -164,4 +180,8 @@ public abstract class MockedBookKeeperTestCase { protected void stopMetadataStore() { metadataStore.setAlwaysFail(new MetadataStoreException("failed")); } + + protected void setEntryCacheCreator(Function<ManagedLedgerImpl, EntryCache> entryCacheCreator) { + this.entryCacheCreator = entryCacheCreator; + } }
