This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new da319a98cee [fix][ml]Still got BK ledger, even though it has been
deleted after offloaded (#24432)
da319a98cee is described below
commit da319a98ceea7e0a82396faa4162a1f40638b48b
Author: fengyubiao <[email protected]>
AuthorDate: Sat Jun 21 00:03:39 2025 +0800
[fix][ml]Still got BK ledger, even though it has been deleted after
offloaded (#24432)
(cherry picked from commit 73a4ae4f886936ec33e5c8086b6c0c0151ea7f76)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 34 ++++--
.../mledger/impl/OffloadLedgerDeleteTest.java | 131 ++++++++++++++++++++-
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 2 +-
3 files changed, 154 insertions(+), 13 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f80170aa491..eac77eeab4f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2887,7 +2887,15 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
for (LedgerInfo ls : offloadedLedgersToDelete) {
log.info("[{}] Deleting offloaded ledger {} from
bookkeeper - size: {}", name, ls.getLedgerId(),
ls.getSize());
- asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
+ invalidateReadHandle(ls.getLedgerId());
+
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId()).thenAccept(__ -> {
+ log.info("[{}] Deleted and invalidated offloaded
ledger {} from bookkeeper - size: {}",
+ name, ls.getLedgerId(), ls.getSize());
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to delete offloaded ledger
{} from bookkeeper - size: {}",
+ name, ls.getLedgerId(), ls.getSize(), ex);
+ return null;
+ });
}
promise.complete(null);
}
@@ -3108,8 +3116,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
- private void asyncDeleteLedgerFromBookKeeper(long ledgerId) {
- asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+ private CompletableFuture<Void> asyncDeleteLedgerFromBookKeeper(long
ledgerId) {
+ return asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
}
private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
@@ -3126,22 +3134,32 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
}
- private void asyncDeleteLedger(long ledgerId, long retry) {
- if (retry <= 0) {
- log.warn("[{}] Failed to delete ledger after retries {}", name,
ledgerId);
- return;
- }
+ private CompletableFuture<Void> asyncDeleteLedger(long ledgerId, long
retry) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ asyncDeleteLedgerWithRetry(future, ledgerId, retry);
+ return future;
+ }
+
+ private void asyncDeleteLedgerWithRetry(CompletableFuture<Void> future,
long ledgerId, long retry) {
bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
if (isNoSuchLedgerExistsException(rc)) {
log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
+ future.complete(null);
} else if (rc != BKException.Code.OK) {
log.error("[{}] Error deleting ledger {} : {}", name,
ledgerId, BKException.getMessage(rc));
+ if (retry <= 1) {
+ // The latest once of retry has failed
+ log.warn("[{}] Failed to delete ledger after retries {},
code: {}", name, ledgerId, rc);
+ future.completeExceptionally(BKException.create(rc));
+ return;
+ }
scheduledExecutor.schedule(() -> asyncDeleteLedger(ledgerId,
retry - 1),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC,
TimeUnit.SECONDS);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Deleted ledger {}", name, ledgerId);
}
+ future.complete(null);
}
}, null);
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
index b46f06106cf..a0f6c32a288 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
@@ -19,29 +19,33 @@
package org.apache.bookkeeper.mledger.impl;
import static
org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;
-
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.MockClock;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
+import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -207,6 +211,125 @@ public class OffloadLedgerDeleteTest extends
MockedBookKeeperTestCase {
assertEventuallyTrue(() ->
offloader.deletedOffloads().contains(firstLedgerId));
}
+ @Test
+ public void testGetReadLedgerHandleAfterTrimOffloadedLedgers() throws
Exception {
+ // Create managed ledger.
+ final long offloadThresholdSeconds = 5;
+ final long offloadDeletionLagInSeconds = 1;
+ OffloadPrefixTest.MockLedgerOffloader offloader = new
OffloadPrefixTest.MockLedgerOffloader();
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(offloadDeletionLagInSeconds
* 1000);
+
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(offloadThresholdSeconds);
+
offloader.getOffloadPolicies().setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST);
+ config.setLedgerOffloader(offloader);
+ ManagedLedgerImpl ml =
+
(ManagedLedgerImpl)factory.open("testGetReadLedgerHandleAfterTrimOffloadedLedgers",
config);
+ ml.openCursor("c1");
+
+ // Write entries.
+ int i = 0;
+ for (; i < 35; i++) {
+ String content = "entry-" + i;
+ ml.addEntry(content.getBytes());
+ }
+ Assert.assertEquals(ml.getLedgersInfoAsList().size(), 4);
+ long ledger1 = ml.getLedgersInfoAsList().get(0).getLedgerId();
+ long ledger2 = ml.getLedgersInfoAsList().get(1).getLedgerId();
+ long ledger3 = ml.getLedgersInfoAsList().get(2).getLedgerId();
+ long ledger4 = ml.getLedgersInfoAsList().get(3).getLedgerId();
+
+ // Offload ledgers.
+ Thread.sleep(offloadThresholdSeconds * 2 * 1000);
+ CompletableFuture<PositionImpl> offloadFuture = new
CompletableFuture<PositionImpl>();
+ ml.maybeOffloadInBackground(offloadFuture);
+ offloadFuture.join();
+
+ // Cache ledger handle.
+ CountDownLatch readCountDownLatch = new CountDownLatch(4);
+ AsyncCallbacks.ReadEntryCallback readCb = new
AsyncCallbacks.ReadEntryCallback(){
+
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ readCountDownLatch.countDown();
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
+ readCountDownLatch.countDown();
+ }
+ };
+ ml.asyncReadEntry(PositionImpl.get(ledger1, 0), readCb, null);
+ ml.asyncReadEntry(PositionImpl.get(ledger2, 0), readCb, null);
+ ml.asyncReadEntry(PositionImpl.get(ledger3, 0), readCb, null);
+ ml.asyncReadEntry(PositionImpl.get(ledger4, 0), readCb, null);
+ readCountDownLatch.await();
+ ReadHandle originalReadHandle4 = ml.getLedgerHandle(ledger4).join();
+
+ // Trim offloaded BK ledger handles.
+ Thread.sleep(offloadDeletionLagInSeconds * 2 * 1000);
+ CompletableFuture<Position> trimLedgerFuture = new
CompletableFuture<Position>();
+ ml.internalTrimLedgers(false, trimLedgerFuture);
+ trimLedgerFuture.join();
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 =
ml.getLedgerInfo(ledger1).get();
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 =
ml.getLedgerInfo(ledger2).get();
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo3 =
ml.getLedgerInfo(ledger3).get();
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo4 =
ml.getLedgerInfo(ledger4).get();
+ Assert.assertTrue(ledgerInfo1.hasOffloadContext() &&
ledgerInfo1.getOffloadContext().getBookkeeperDeleted());
+ Assert.assertTrue(ledgerInfo2.hasOffloadContext() &&
ledgerInfo2.getOffloadContext().getBookkeeperDeleted());
+ Assert.assertTrue(ledgerInfo3.hasOffloadContext() &&
ledgerInfo3.getOffloadContext().getBookkeeperDeleted());
+ Assert.assertFalse(ledgerInfo4.hasOffloadContext() ||
ledgerInfo4.getOffloadContext().getBookkeeperDeleted());
+
+ Awaitility.await().untilAsserted(() -> {
+ try {
+ factory.getBookKeeper().get().openLedger(ledger3,
ml.digestType, ml.config.getPassword());
+ Assert.fail("Should fail: the ledger has been deleted");
+ } catch (BKException.BKNoSuchLedgerExistsException ex) {
+ // Expected.
+ }
+ try {
+ factory.getBookKeeper().get().openLedger(ledger2,
ml.digestType, ml.config.getPassword());
+ Assert.fail("Should fail: the ledger has been deleted");
+ } catch (BKException.BKNoSuchLedgerExistsException ex) {
+ // Expected.
+ }
+ try {
+ factory.getBookKeeper().get().openLedger(ledger1,
ml.digestType, ml.config.getPassword());
+ Assert.fail("Should fail: the ledger has been deleted");
+ } catch (BKException.BKNoSuchLedgerExistsException ex) {
+ // Expected.
+ }
+ });
+
+ // Verify: "ml.getLedgerHandle" returns a correct ledger handle.
+ ReadHandle currentReadHandle4 = ml.getLedgerHandle(ledger4).join();
+ Assert.assertEquals(currentReadHandle4, originalReadHandle4);
+ try {
+ ml.getLedgerHandle(ledger3).join();
+ Assert.fail("should get a failure: MockLedgerOffloader does not
support read");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex.getCause().getCause().getMessage()
+ .contains("MockLedgerOffloader does not support read"));
+ }
+ try {
+ ml.getLedgerHandle(ledger2).join();
+ Assert.fail("should get a failure: MockLedgerOffloader does not
support read");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex.getCause().getCause().getMessage()
+ .contains("MockLedgerOffloader does not support read"));
+ }
+ try {
+ ml.getLedgerHandle(ledger1).join();
+ Assert.fail("should get a failure: MockLedgerOffloader does not
support read");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex.getCause().getCause().getMessage()
+ .contains("MockLedgerOffloader does not support read"));
+ }
+ }
+
@Test(timeOut = 5000)
public void testFileSystemOffloadDeletePath() throws Exception {
MockFileSystemLedgerOffloader offloader = new
MockFileSystemLedgerOffloader();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 53fa1725585..2a13a7cfc61 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -1292,7 +1292,7 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID
uuid,
Map<String, String>
offloadDriverMetadata) {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
- promise.completeExceptionally(new UnsupportedOperationException());
+ promise.completeExceptionally(new
UnsupportedOperationException("MockLedgerOffloader does not support read"));
return promise;
}