This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6001c41  Delete offloaded ledger when ledger deleted (#1641)
6001c41 is described below

commit 6001c417c7edf5892c55bc64e299023f16a5fbd9
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Apr 25 23:33:21 2018 +0200

    Delete offloaded ledger when ledger deleted (#1641)
    
    * Delete offloaded ledger when ledger deleted
    
    When a managed ledger trims a ledger, if that ledger has been
    offloaded to long term storage, delete it from long term storage
    also.
    
    Currently, it will always try to delete the bookkeeper ledger also,
    even if the ledger has already been offloaded. Handling for this case
    will be added in a later patch along with delayed bookkeeper ledger
    deletion in the case of offload.
    
    Master Issue: #1511
    
    * Fixup for delete
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 18 ++--
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 98 ++++++++++++++++++++++
 2 files changed, 110 insertions(+), 6 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index db644b2..50bc85d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1666,7 +1666,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
                     for (LedgerInfo ls : ledgersToDelete) {
                         log.info("[{}] Removing ledger {} - size: {}", name, 
ls.getLedgerId(), ls.getSize());
-                        asyncDeleteLedger(ls.getLedgerId());
+                        asyncDeleteLedger(ls.getLedgerId(), ls);
                     }
                 }
 
@@ -1758,8 +1758,14 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
-    private void asyncDeleteLedger(long ledgerId) {
+    private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
         asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+
+        if (info.getOffloadContext().hasUidMsb()) {
+            UUID uuid = new UUID(info.getOffloadContext().getUidMsb(),
+                                 info.getOffloadContext().getUidLsb());
+            cleanupOffloaded(ledgerId, uuid, "Trimming");
+        }
     }
 
     private void asyncDeleteLedger(long ledgerId, long retry) {
@@ -1963,7 +1969,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                                            scheduledExecutor, name)
                             .whenComplete((ignore2, exception) -> {
                                     if (exception != null) {
-                                        cleanupOffloadedOnFailure(ledgerId, 
uuid, "Metastore failure");
+                                        cleanupOffloaded(ledgerId, uuid, 
"Metastore failure");
                                     }
                                 });
                     })
@@ -2056,7 +2062,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                                                                    
oldInfo.getOffloadContext().getUidLsb());
                                            log.info("[{}] Found previous 
offload attempt for ledger {}, uuid {}"
                                                     + ", cleaning up", name, 
ledgerId, uuid);
-                                           cleanupOffloadedOnFailure(ledgerId, 
oldUuid, "Previous failed offload");
+                                           cleanupOffloaded(ledgerId, oldUuid, 
"Previous failed offload");
                                        }
                                        LedgerInfo.Builder builder = 
oldInfo.toBuilder();
                                        builder.getOffloadContextBuilder()
@@ -2078,7 +2084,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         return transformLedgerInfo(ledgerId,
                                    (oldInfo) -> {
                                        UUID existingUuid = new 
UUID(oldInfo.getOffloadContext().getUidMsb(),
-                                                               
oldInfo.getOffloadContext().getUidLsb());
+                                                                    
oldInfo.getOffloadContext().getUidLsb());
                                        if (existingUuid.equals(uuid)) {
                                            LedgerInfo.Builder builder = 
oldInfo.toBuilder();
                                            builder.getOffloadContextBuilder()
@@ -2101,7 +2107,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 });
     }
 
-    private void cleanupOffloadedOnFailure(long ledgerId, UUID uuid, String 
cleanupReason) {
+    private void cleanupOffloaded(long ledgerId, UUID uuid, String 
cleanupReason) {
         Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), 
TimeUnit.SECONDS.toHours(1)).limit(10),
                     Retries.NonFatalPredicate,
                     () -> 
config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid),
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 230456d..311a82e 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -541,6 +541,98 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
         
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
     }
 
+    @Test
+    public void testOffloadDelete() throws Exception {
+        Set<Pair<Long, UUID>> deleted = ConcurrentHashMap.newKeySet();
+        CompletableFuture<Set<Long>> errorLedgers = new CompletableFuture<>();
+        Set<Pair<Long, UUID>> failedOffloads = ConcurrentHashMap.newKeySet();
+
+        MockLedgerOffloader offloader = new MockLedgerOffloader();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(0, TimeUnit.MINUTES);
+        config.setLedgerOffloader(offloader);
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedCursor cursor = ledger.openCursor("foobar");
+        for (int i = 0; i < 15; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+                            .filter(e -> 
e.getOffloadContext().getComplete()).count(), 1);
+        
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
+        long firstLedger = ledger.getLedgersInfoAsList().get(0).getLedgerId();
+        long secondLedger = ledger.getLedgersInfoAsList().get(1).getLedgerId();
+
+        cursor.markDelete(ledger.getLastConfirmedEntry());
+        assertEventuallyTrue(() -> ledger.getLedgersInfoAsList().size() == 1);
+        
Assert.assertEquals(ledger.getLedgersInfoAsList().get(0).getLedgerId(), 
secondLedger);
+
+        assertEventuallyTrue(() -> 
offloader.deletedOffloads().contains(firstLedger));
+    }
+
+    @Test
+    public void testOffloadDeleteIncomplete() throws Exception {
+        Set<Pair<Long, UUID>> deleted = ConcurrentHashMap.newKeySet();
+        CompletableFuture<Set<Long>> errorLedgers = new CompletableFuture<>();
+        Set<Pair<Long, UUID>> failedOffloads = ConcurrentHashMap.newKeySet();
+
+        MockLedgerOffloader offloader = new MockLedgerOffloader() {
+                @Override
+                public CompletableFuture<Void> offload(ReadHandle ledger,
+                                                       UUID uuid,
+                                                       Map<String, String> 
extraMetadata) {
+                    return super.offload(ledger, uuid, extraMetadata)
+                        .thenCompose((res) -> {
+                                CompletableFuture<Void> f = new 
CompletableFuture<>();
+                                f.completeExceptionally(new Exception("Fail 
after offload occurred"));
+                                return f;
+                            });
+                }
+            };
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(0, TimeUnit.MINUTES);
+        config.setLedgerOffloader(offloader);
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedCursor cursor = ledger.openCursor("foobar");
+        for (int i = 0; i < 15; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        try {
+            ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+        } catch (ManagedLedgerException mle) {
+            // expected
+        }
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+                            .filter(e -> 
e.getOffloadContext().getComplete()).count(), 0);
+        Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+                            .filter(e -> 
e.getOffloadContext().hasUidMsb()).count(), 1);
+        
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().hasUidMsb());
+
+        long firstLedger = ledger.getLedgersInfoAsList().get(0).getLedgerId();
+        long secondLedger = ledger.getLedgersInfoAsList().get(1).getLedgerId();
+
+        cursor.markDelete(ledger.getLastConfirmedEntry());
+        assertEventuallyTrue(() -> ledger.getLedgersInfoAsList().size() == 1);
+        
Assert.assertEquals(ledger.getLedgersInfoAsList().get(0).getLedgerId(), 
secondLedger);
+
+        assertEventuallyTrue(() -> 
offloader.deletedOffloads().contains(firstLedger));
+    }
+
     void assertEventuallyTrue(BooleanSupplier predicate) throws Exception {
         // wait up to 3 seconds
         for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {
@@ -563,11 +655,16 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
 
     static class MockLedgerOffloader implements LedgerOffloader {
         ConcurrentHashMap<Long, UUID> offloads = new ConcurrentHashMap<Long, 
UUID>();
+        ConcurrentHashMap<Long, UUID> deletes = new ConcurrentHashMap<Long, 
UUID>();
 
         Set<Long> offloadedLedgers() {
             return offloads.keySet();
         }
 
+        Set<Long> deletedOffloads() {
+            return deletes.keySet();
+        }
+
         @Override
         public CompletableFuture<Void> offload(ReadHandle ledger,
                                                UUID uuid,
@@ -592,6 +689,7 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
         public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uuid) {
             CompletableFuture<Void> promise = new CompletableFuture<>();
             if (offloads.remove(ledgerId, uuid)) {
+                deletes.put(ledgerId, uuid);
                 promise.complete(null);
             } else {
                 promise.completeExceptionally(new Exception("Not found"));

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to