This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6001c41 Delete offloaded ledger when ledger deleted (#1641) 6001c41 is described below commit 6001c417c7edf5892c55bc64e299023f16a5fbd9 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Wed Apr 25 23:33:21 2018 +0200 Delete offloaded ledger when ledger deleted (#1641) * Delete offloaded ledger when ledger deleted When a managed ledger trims a ledger, if that ledger has been offloaded to long term storage, delete it from long term storage also. Currently, it will always try to delete the bookkeeper ledger also, even if the ledger has already been offloaded. Handling for this case will be added in a later patch along with delayed bookkeeper ledger deletion in the case of offload. Master Issue: #1511 * Fixup for delete --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 18 ++-- .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 98 ++++++++++++++++++++++ 2 files changed, 110 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index db644b2..50bc85d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1666,7 +1666,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { for (LedgerInfo ls : ledgersToDelete) { log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); - asyncDeleteLedger(ls.getLedgerId()); + asyncDeleteLedger(ls.getLedgerId(), ls); } } @@ -1758,8 +1758,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } - private void asyncDeleteLedger(long ledgerId) { + private void asyncDeleteLedger(long ledgerId, LedgerInfo info) { asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); + + if (info.getOffloadContext().hasUidMsb()) { + UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), + info.getOffloadContext().getUidLsb()); + cleanupOffloaded(ledgerId, uuid, "Trimming"); + } } private void asyncDeleteLedger(long ledgerId, long retry) { @@ -1963,7 +1969,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { scheduledExecutor, name) .whenComplete((ignore2, exception) -> { if (exception != null) { - cleanupOffloadedOnFailure(ledgerId, uuid, "Metastore failure"); + cleanupOffloaded(ledgerId, uuid, "Metastore failure"); } }); }) @@ -2056,7 +2062,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { oldInfo.getOffloadContext().getUidLsb()); log.info("[{}] Found previous offload attempt for ledger {}, uuid {}" + ", cleaning up", name, ledgerId, uuid); - cleanupOffloadedOnFailure(ledgerId, oldUuid, "Previous failed offload"); + cleanupOffloaded(ledgerId, oldUuid, "Previous failed offload"); } LedgerInfo.Builder builder = oldInfo.toBuilder(); builder.getOffloadContextBuilder() @@ -2078,7 +2084,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return transformLedgerInfo(ledgerId, (oldInfo) -> { UUID existingUuid = new UUID(oldInfo.getOffloadContext().getUidMsb(), - oldInfo.getOffloadContext().getUidLsb()); + oldInfo.getOffloadContext().getUidLsb()); if (existingUuid.equals(uuid)) { LedgerInfo.Builder builder = oldInfo.toBuilder(); builder.getOffloadContextBuilder() @@ -2101,7 +2107,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { }); } - private void cleanupOffloadedOnFailure(long ledgerId, UUID uuid, String cleanupReason) { + private void cleanupOffloaded(long ledgerId, UUID uuid, String cleanupReason) { Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toHours(1)).limit(10), Retries.NonFatalPredicate, () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid), diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 230456d..311a82e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -541,6 +541,98 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete()); } + @Test + public void testOffloadDelete() throws Exception { + Set<Pair<Long, UUID>> deleted = ConcurrentHashMap.newKeySet(); + CompletableFuture<Set<Long>> errorLedgers = new CompletableFuture<>(); + Set<Pair<Long, UUID>> failedOffloads = ConcurrentHashMap.newKeySet(); + + MockLedgerOffloader offloader = new MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(0, TimeUnit.MINUTES); + config.setLedgerOffloader(offloader); + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + ManagedCursor cursor = ledger.openCursor("foobar"); + for (int i = 0; i < 15; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()).count(), 1); + Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete()); + long firstLedger = ledger.getLedgersInfoAsList().get(0).getLedgerId(); + long secondLedger = ledger.getLedgersInfoAsList().get(1).getLedgerId(); + + cursor.markDelete(ledger.getLastConfirmedEntry()); + assertEventuallyTrue(() -> ledger.getLedgersInfoAsList().size() == 1); + Assert.assertEquals(ledger.getLedgersInfoAsList().get(0).getLedgerId(), secondLedger); + + assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger)); + } + + @Test + public void testOffloadDeleteIncomplete() throws Exception { + Set<Pair<Long, UUID>> deleted = ConcurrentHashMap.newKeySet(); + CompletableFuture<Set<Long>> errorLedgers = new CompletableFuture<>(); + Set<Pair<Long, UUID>> failedOffloads = ConcurrentHashMap.newKeySet(); + + MockLedgerOffloader offloader = new MockLedgerOffloader() { + @Override + public CompletableFuture<Void> offload(ReadHandle ledger, + UUID uuid, + Map<String, String> extraMetadata) { + return super.offload(ledger, uuid, extraMetadata) + .thenCompose((res) -> { + CompletableFuture<Void> f = new CompletableFuture<>(); + f.completeExceptionally(new Exception("Fail after offload occurred")); + return f; + }); + } + }; + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(0, TimeUnit.MINUTES); + config.setLedgerOffloader(offloader); + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + ManagedCursor cursor = ledger.openCursor("foobar"); + for (int i = 0; i < 15; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + try { + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + } catch (ManagedLedgerException mle) { + // expected + } + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()).count(), 0); + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().hasUidMsb()).count(), 1); + Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().hasUidMsb()); + + long firstLedger = ledger.getLedgersInfoAsList().get(0).getLedgerId(); + long secondLedger = ledger.getLedgersInfoAsList().get(1).getLedgerId(); + + cursor.markDelete(ledger.getLastConfirmedEntry()); + assertEventuallyTrue(() -> ledger.getLedgersInfoAsList().size() == 1); + Assert.assertEquals(ledger.getLedgersInfoAsList().get(0).getLedgerId(), secondLedger); + + assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger)); + } + void assertEventuallyTrue(BooleanSupplier predicate) throws Exception { // wait up to 3 seconds for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) { @@ -563,11 +655,16 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { static class MockLedgerOffloader implements LedgerOffloader { ConcurrentHashMap<Long, UUID> offloads = new ConcurrentHashMap<Long, UUID>(); + ConcurrentHashMap<Long, UUID> deletes = new ConcurrentHashMap<Long, UUID>(); Set<Long> offloadedLedgers() { return offloads.keySet(); } + Set<Long> deletedOffloads() { + return deletes.keySet(); + } + @Override public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, @@ -592,6 +689,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid) { CompletableFuture<Void> promise = new CompletableFuture<>(); if (offloads.remove(ledgerId, uuid)) { + deletes.put(ledgerId, uuid); promise.complete(null); } else { promise.completeExceptionally(new Exception("Not found")); -- To stop receiving notification emails like this one, please contact si...@apache.org.