This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 03ab0d7  BookKeeper ledgers cleaned up after offload (#1668)
03ab0d7 is described below

commit 03ab0d78706c4b310b7b4fdfd632346463c0e928
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Sat Apr 28 01:21:00 2018 +0200

    BookKeeper ledgers cleaned up after offload (#1668)
    
    This patch adds a new configuration for managed ledger, offload ledger
    deletion lag, which is the amount of time to wait after a ledger has
    been offloaded to long term storage before the ledger will be deleted
    from bookkeeper. Defaults to 4 hours.
    
    Master Issue: #1511
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |  23 +++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  89 ++++++++---
 .../bookkeeper/mledger/proto/MLDataFormats.java    | 162 ++++++++++++++++++---
 .../apache/bookkeeper/mledger/util/Futures.java    |   2 +
 managed-ledger/src/main/proto/MLDataFormats.proto  |   2 +
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |   3 +-
 .../mledger/impl/OffloadLedgerDeleteTest.java      | 148 +++++++++++++++++++
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |   2 +-
 .../apache/bookkeeper/mledger/util/MockClock.java  |  50 +++++++
 9 files changed, 433 insertions(+), 48 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index f3add7c..566701c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -54,6 +54,7 @@ public class ManagedLedgerConfig {
     private long retentionTimeMs = 0;
     private long retentionSizeInMB = 0;
     private boolean autoSkipNonRecoverableData;
+    private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
 
     private DigestType digestType = DigestType.CRC32C;
     private byte[] password = "".getBytes(Charsets.UTF_8);
@@ -387,6 +388,28 @@ public class ManagedLedgerConfig {
     }
 
     /**
+     * When a ledger is offloaded from bookkeeper storage to longterm storage, 
the bookkeeper ledger
+     * is not deleted immediately. Instead we wait for a grace period before 
deleting from bookkeeper.
+     * The offloadLedgerDeleteLag sets this grace period.
+     *
+     * @param lagTime period to wait before deleting offloaded ledgers from 
bookkeeper
+     * @param unit timeunit for lagTime
+     */
+    public ManagedLedgerConfig setOffloadLedgerDeletionLag(int lagTime, 
TimeUnit unit) {
+        this.offloadLedgerDeletionLagMs = unit.toMillis(lagTime);
+        return this;
+    }
+
+    /**
+     * Number of milliseconds before an offloaded ledger will be deleted from 
bookkeeper.
+     *
+     * @return the offload ledger deletion lag time in milliseconds
+     */
+    public long getOffloadLedgerDeletionLagMillis() {
+        return offloadLedgerDeletionLagMs;
+    }
+
+    /**
      * Skip reading non-recoverable/unreadable data-ledger under 
managed-ledger's list. It helps when data-ledgers gets
      * corrupted at bookkeeper and managed-cursor is stuck at that ledger.
      */
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a3ec2e5..f1d7100 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -97,6 +97,7 @@ import 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
 import org.apache.bookkeeper.mledger.util.CallbackMutex;
@@ -1543,13 +1544,17 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
     }
 
     private void trimConsumedLedgersInBackground() {
+        trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+    }
+
+    private void trimConsumedLedgersInBackground(CompletableFuture<?> promise) 
{
         executor.executeOrdered(name, safeRun(() -> {
-            internalTrimConsumedLedgers();
+                    internalTrimConsumedLedgers(promise);
         }));
     }
 
-    private void scheduleDeferredTrimming() {
-        scheduledExecutor.schedule(safeRun(() -> 
trimConsumedLedgersInBackground()), 100, TimeUnit.MILLISECONDS);
+    private void scheduleDeferredTrimming(CompletableFuture<?> promise) {
+        scheduledExecutor.schedule(safeRun(() -> 
trimConsumedLedgersInBackground(promise)), 100, TimeUnit.MILLISECONDS);
     }
 
     private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
@@ -1568,20 +1573,27 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                 && TOTAL_SIZE_UPDATER.get(this) > ((long) 
config.getRetentionSizeInMB()) * 1024 * 1024;
     }
 
+    private boolean isOffloadedNeedsDelete(OffloadContext offload) {
+        long elapsedMs = clock.millis() - offload.getTimestamp();
+        return offload.getComplete()
+            && !offload.getBookkeeperDeleted()
+            && elapsedMs > config.getOffloadLedgerDeletionLagMillis();
+    }
+
     /**
      * Checks whether there are ledger that have been fully consumed and 
deletes them.
      *
      * @throws Exception
      */
-    void internalTrimConsumedLedgers() {
+    void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
         // Ensure only one trimming operation is active
         if (!trimmerMutex.tryLock()) {
-            scheduleDeferredTrimming();
+            scheduleDeferredTrimming(promise);
             return;
         }
 
         List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
-
+        List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList();
         synchronized (this) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Start TrimConsumedLedgers. ledgers={} 
totalSize={}", name, ledgers.keySet(),
@@ -1590,6 +1602,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             if (STATE_UPDATER.get(this) == State.Closed) {
                 log.debug("[{}] Ignoring trimming request since the managed 
ledger was already closed", name);
                 trimmerMutex.unlock();
+                promise.completeExceptionally(new 
ManagedLedgerAlreadyClosedException("Can't trim closed ledger"));
                 return;
             }
 
@@ -1604,6 +1617,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 if (slowestReaderPosition != null) {
                     slowestReaderLedgerId = 
slowestReaderPosition.getLedgerId();
                 } else {
+                    promise.completeExceptionally(new 
ManagedLedgerException("Couldn't find reader position"));
                     trimmerMutex.unlock();
                     return;
                 }
@@ -1625,43 +1639,55 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                         name, ls.getLedgerId(), (clock.millis() - 
ls.getTimestamp()) / 1000.0, expired,
                         overRetentionQuota, currentLedger.getId());
                 }
-                if (ls.getLedgerId() == currentLedger.getId() || (!expired && 
!overRetentionQuota)) {
-                    if (log.isDebugEnabled()) {
-                        if (!expired) {
-                            log.debug("[{}] ledger id skipped for deletion as 
unexpired: {}", name, ls.getLedgerId());
-                        }
-                        if (!overRetentionQuota) {
-                            log.debug("[{}] ledger id: {} skipped for deletion 
as size: {} under quota: {} MB", name,
-                                    ls.getLedgerId(), 
TOTAL_SIZE_UPDATER.get(this), config.getRetentionSizeInMB());
-                        }
-                    }
+                if (ls.getLedgerId() == currentLedger.getId()) {
+                    log.debug("[{}] ledger id skipped for deletion as it is 
currently being written to",
+                              name, ls.getLedgerId());
+                    break;
+                } else if (expired) {
+                    log.debug("[{}] Ledger {} has expired, ts {}", name, 
ls.getLedgerId(), ls.getTimestamp());
+                    ledgersToDelete.add(ls);
+                } else if (overRetentionQuota) {
+                    log.debug("[{}] Ledger {} is over quota", name, 
ls.getLedgerId());
+                    ledgersToDelete.add(ls);
+                } else if (isOffloadedNeedsDelete(ls.getOffloadContext())) {
+                    log.debug("[{}] Ledger {} has been offloaded, bookkeeper 
ledger needs to be deleted",
+                              name, ls.getLedgerId());
+                    offloadedLedgersToDelete.add(ls);
+                } else {
+                    log.debug("[{}] Nothing done for ledger {}. Neither 
expired, over-quota nor offloaded",
+                              name, ls.getLedgerId());
                     break;
                 }
-
-                ledgersToDelete.add(ls);
-                ledgerCache.remove(ls.getLedgerId());
             }
 
-            if (ledgersToDelete.isEmpty()) {
+            if (ledgersToDelete.isEmpty() && 
offloadedLedgersToDelete.isEmpty()) {
                 trimmerMutex.unlock();
+                promise.complete(null);
                 return;
             }
 
             if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now 
and schedule a new trimming
                     || !ledgersListMutex.tryLock()) { // Avoid deadlocks with 
other operations updating the ledgers list
-                scheduleDeferredTrimming();
+                scheduleDeferredTrimming(promise);
                 trimmerMutex.unlock();
                 return;
             }
 
             // Update metadata
             for (LedgerInfo ls : ledgersToDelete) {
+                ledgerCache.remove(ls.getLedgerId());
+
                 ledgers.remove(ls.getLedgerId());
                 NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
                 TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
 
                 entryCache.invalidateAllEntries(ls.getLedgerId());
             }
+            for (LedgerInfo ls : offloadedLedgersToDelete) {
+                LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
+                
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
+                ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
+            }
 
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Updating of ledgers list after trimming", 
name);
@@ -1680,6 +1706,12 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                         log.info("[{}] Removing ledger {} - size: {}", name, 
ls.getLedgerId(), ls.getSize());
                         asyncDeleteLedger(ls.getLedgerId(), ls);
                     }
+                    for (LedgerInfo ls : offloadedLedgersToDelete) {
+                        log.info("[{}] Deleting offloaded ledger {} from 
bookkeeper - size: {}",
+                                 name, ls.getLedgerId(), ls.getSize());
+                        asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
+                    }
+                    promise.complete(null);
                 }
 
                 @Override
@@ -1687,6 +1719,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     log.warn("[{}] Failed to update the list of ledgers after 
trimming", name, e);
                     ledgersListMutex.unlock();
                     trimmerMutex.unlock();
+
+                    promise.completeExceptionally(e);
                 }
             });
         }
@@ -1770,8 +1804,15 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
-    private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
+    private void asyncDeleteLedgerFromBookKeeper(long ledgerId) {
         asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+    }
+
+    private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
+        if (!info.getOffloadContext().getBookkeeperDeleted()) {
+            // only delete if it hasn't been previously deleted for offload
+            asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+        }
 
         if (info.getOffloadContext().hasUidMsb()) {
             UUID uuid = new UUID(info.getOffloadContext().getUidMsb(),
@@ -2084,7 +2125,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                                    })
             .whenComplete((result, exception) -> {
                     if (exception != null) {
-                        log.warn("[{}] Failed to prepare ledger {} for 
offload, uuid {}", name, ledgerId, uuid);
+                        log.warn("[{}] Failed to prepare ledger {} for 
offload, uuid {}",
+                                 name, ledgerId, uuid, exception);
                     } else {
                         log.info("[{}] Metadata prepared for offload of ledger 
{} with uuid {}", name, ledgerId, uuid);
                     }
@@ -2100,6 +2142,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                                        if (existingUuid.equals(uuid)) {
                                            LedgerInfo.Builder builder = 
oldInfo.toBuilder();
                                            builder.getOffloadContextBuilder()
+                                               .setTimestamp(clock.millis())
                                                .setComplete(true);
                                            return builder.build();
                                        } else {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
index 1f0fae4..73ba1da 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
@@ -22,6 +22,14 @@ public final class MLDataFormats {
     // optional bool complete = 3;
     boolean hasComplete();
     boolean getComplete();
+    
+    // optional bool bookkeeperDeleted = 4;
+    boolean hasBookkeeperDeleted();
+    boolean getBookkeeperDeleted();
+    
+    // optional int64 timestamp = 5;
+    boolean hasTimestamp();
+    long getTimestamp();
   }
   public static final class OffloadContext extends
       com.google.protobuf.GeneratedMessage
@@ -82,10 +90,32 @@ public final class MLDataFormats {
       return complete_;
     }
     
+    // optional bool bookkeeperDeleted = 4;
+    public static final int BOOKKEEPERDELETED_FIELD_NUMBER = 4;
+    private boolean bookkeeperDeleted_;
+    public boolean hasBookkeeperDeleted() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public boolean getBookkeeperDeleted() {
+      return bookkeeperDeleted_;
+    }
+    
+    // optional int64 timestamp = 5;
+    public static final int TIMESTAMP_FIELD_NUMBER = 5;
+    private long timestamp_;
+    public boolean hasTimestamp() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    public long getTimestamp() {
+      return timestamp_;
+    }
+    
     private void initFields() {
       uidMsb_ = 0L;
       uidLsb_ = 0L;
       complete_ = false;
+      bookkeeperDeleted_ = false;
+      timestamp_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -108,6 +138,12 @@ public final class MLDataFormats {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBool(3, complete_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBool(4, bookkeeperDeleted_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeInt64(5, timestamp_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -129,6 +165,14 @@ public final class MLDataFormats {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(3, complete_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(4, bookkeeperDeleted_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(5, timestamp_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -259,6 +303,10 @@ public final class MLDataFormats {
         bitField0_ = (bitField0_ & ~0x00000002);
         complete_ = false;
         bitField0_ = (bitField0_ & ~0x00000004);
+        bookkeeperDeleted_ = false;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        timestamp_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
       
@@ -309,6 +357,14 @@ public final class MLDataFormats {
           to_bitField0_ |= 0x00000004;
         }
         result.complete_ = complete_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.bookkeeperDeleted_ = bookkeeperDeleted_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.timestamp_ = timestamp_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -334,6 +390,12 @@ public final class MLDataFormats {
         if (other.hasComplete()) {
           setComplete(other.getComplete());
         }
+        if (other.hasBookkeeperDeleted()) {
+          setBookkeeperDeleted(other.getBookkeeperDeleted());
+        }
+        if (other.hasTimestamp()) {
+          setTimestamp(other.getTimestamp());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -380,6 +442,16 @@ public final class MLDataFormats {
               complete_ = input.readBool();
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              bookkeeperDeleted_ = input.readBool();
+              break;
+            }
+            case 40: {
+              bitField0_ |= 0x00000010;
+              timestamp_ = input.readInt64();
+              break;
+            }
           }
         }
       }
@@ -449,6 +521,48 @@ public final class MLDataFormats {
         return this;
       }
       
+      // optional bool bookkeeperDeleted = 4;
+      private boolean bookkeeperDeleted_ ;
+      public boolean hasBookkeeperDeleted() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public boolean getBookkeeperDeleted() {
+        return bookkeeperDeleted_;
+      }
+      public Builder setBookkeeperDeleted(boolean value) {
+        bitField0_ |= 0x00000008;
+        bookkeeperDeleted_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearBookkeeperDeleted() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        bookkeeperDeleted_ = false;
+        onChanged();
+        return this;
+      }
+      
+      // optional int64 timestamp = 5;
+      private long timestamp_ ;
+      public boolean hasTimestamp() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public long getTimestamp() {
+        return timestamp_;
+      }
+      public Builder setTimestamp(long value) {
+        bitField0_ |= 0x00000010;
+        timestamp_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearTimestamp() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        timestamp_ = 0L;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:OffloadContext)
     }
     
@@ -5359,29 +5473,31 @@ public final class MLDataFormats {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\"src/main/proto/MLDataFormats.proto\"B\n\016" +
+      "\n\"src/main/proto/MLDataFormats.proto\"p\n\016" +
       "OffloadContext\022\016\n\006uidMsb\030\001 
\001(\003\022\016\n\006uidLsb" +
-      "\030\002 \001(\003\022\020\n\010complete\030\003 
\001(\010\"\362\001\n\021ManagedLedg" +
-      "erInfo\0221\n\nledgerInfo\030\001 \003(\0132\035.ManagedLedg" +
-      "erInfo.LedgerInfo\022/\n\022terminatedPosition\030" +
-      "\002 \001(\0132\023.NestedPositionInfo\032y\n\nLedgerInfo" +
-      "\022\020\n\010ledgerId\030\001 \002(\003\022\017\n\007entries\030\002 
\001(\003\022\014\n\004s" +
-      "ize\030\003 \001(\003\022\021\n\ttimestamp\030\004 
\001(\003\022\'\n\016offloadC" +
-      "ontext\030\005 \001(\0132\017.OffloadContext\"\206\001\n\014Positi" +
-      "onInfo\022\020\n\010ledgerId\030\001 
\002(\003\022\017\n\007entryId\030\002 \002(",
-      "\003\0220\n\031individualDeletedMessages\030\003 \003(\0132\r.M" +
-      "essageRange\022!\n\nproperties\030\004 \003(\0132\r.LongPr" +
-      "operty\"7\n\022NestedPositionInfo\022\020\n\010ledgerId" +
-      "\030\001 \002(\003\022\017\n\007entryId\030\002 
\002(\003\"f\n\014MessageRange\022" +
-      "*\n\rlowerEndpoint\030\001 \002(\0132\023.NestedPositionI" +
-      "nfo\022*\n\rupperEndpoint\030\002 \002(\0132\023.NestedPosit" +
-      "ionInfo\"+\n\014LongProperty\022\014\n\004name\030\001 \002(\t\022\r\n" 
+
-      "\005value\030\002 
\002(\003\"\270\001\n\021ManagedCursorInfo\022\027\n\017cu" +
-      "rsorsLedgerId\030\001 \002(\003\022\032\n\022markDeleteLedgerI" +
-      "d\030\002 \001(\003\022\031\n\021markDeleteEntryId\030\003 
\001(\003\0220\n\031in",
-      "dividualDeletedMessages\030\004 \003(\0132\r.MessageR" +
-      "ange\022!\n\nproperties\030\005 \003(\0132\r.LongPropertyB" +
-      "\'\n#org.apache.bookkeeper.mledger.protoH\001"
+      "\030\002 \001(\003\022\020\n\010complete\030\003 
\001(\010\022\031\n\021bookkeeperDe" +
+      "leted\030\004 \001(\010\022\021\n\ttimestamp\030\005 
\001(\003\"\362\001\n\021Manag" +
+      "edLedgerInfo\0221\n\nledgerInfo\030\001 \003(\0132\035.Manag" +
+      "edLedgerInfo.LedgerInfo\022/\n\022terminatedPos" +
+      "ition\030\002 \001(\0132\023.NestedPositionInfo\032y\n\nLedg" +
+      "erInfo\022\020\n\010ledgerId\030\001 
\002(\003\022\017\n\007entries\030\002 \001(" +
+      "\003\022\014\n\004size\030\003 \001(\003\022\021\n\ttimestamp\030\004 
\001(\003\022\'\n\016of" +
+      "floadContext\030\005 \001(\0132\017.OffloadContext\"\206\001\n\014",
+      "PositionInfo\022\020\n\010ledgerId\030\001 
\002(\003\022\017\n\007entryI" +
+      "d\030\002 \002(\003\0220\n\031individualDeletedMessages\030\003 \003" +
+      "(\0132\r.MessageRange\022!\n\nproperties\030\004 \003(\0132\r." +
+      "LongProperty\"7\n\022NestedPositionInfo\022\020\n\010le" +
+      "dgerId\030\001 \002(\003\022\017\n\007entryId\030\002 
\002(\003\"f\n\014Message" +
+      "Range\022*\n\rlowerEndpoint\030\001 \002(\0132\023.NestedPos" +
+      "itionInfo\022*\n\rupperEndpoint\030\002 \002(\0132\023.Neste" +
+      "dPositionInfo\"+\n\014LongProperty\022\014\n\004name\030\001 " +
+      "\002(\t\022\r\n\005value\030\002 
\002(\003\"\270\001\n\021ManagedCursorInfo" +
+      "\022\027\n\017cursorsLedgerId\030\001 
\002(\003\022\032\n\022markDeleteL",
+      "edgerId\030\002 \001(\003\022\031\n\021markDeleteEntryId\030\003 
\001(\003" +
+      "\0220\n\031individualDeletedMessages\030\004 \003(\0132\r.Me" +
+      "ssageRange\022!\n\nproperties\030\005 \003(\0132\r.LongPro" +
+      "pertyB\'\n#org.apache.bookkeeper.mledger.p" +
+      "rotoH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5393,7 +5509,7 @@ public final class MLDataFormats {
           internal_static_OffloadContext_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_OffloadContext_descriptor,
-              new java.lang.String[] { "UidMsb", "UidLsb", "Complete", },
+              new java.lang.String[] { "UidMsb", "UidLsb", "Complete", 
"BookkeeperDeleted", "Timestamp", },
               
org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext.class,
               
org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext.Builder.class);
           internal_static_ManagedLedgerInfo_descriptor =
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
index 11801bb..9fd4a07 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
@@ -29,6 +29,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
  */
 public class Futures {
 
+    public static CompletableFuture<Void> NULL_PROMISE = 
CompletableFuture.completedFuture(null);
+
     /**
      * Adapts a {@link CloseCallback} to a {@link CompletableFuture}.
      */
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto 
b/managed-ledger/src/main/proto/MLDataFormats.proto
index 99e26e3..29f465c 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -25,6 +25,8 @@ message OffloadContext {
     optional int64 uidMsb = 1;
     optional int64 uidLsb = 2;
     optional bool complete = 3;
+    optional bool bookkeeperDeleted = 4;
+    optional int64 timestamp = 5;
 }
 
 message ManagedLedgerInfo {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 60d66a1..d7cce97 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Executor;
@@ -1670,7 +1671,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         c1.skipEntries(1, IndividualDeletedEntries.Exclude);
         // let retention expire
         Thread.sleep(1000);
-        ml.internalTrimConsumedLedgers();
+        
ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
 
         assertTrue(ml.getLedgersInfoAsList().size() <= 1);
         assertTrue(ml.getTotalSize() <= "shortmessage".getBytes().length);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
new file mode 100644
index 0000000..41a7ad0
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static 
org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.util.MockClock;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
+    private static final Logger log = 
LoggerFactory.getLogger(OffloadLedgerDeleteTest.class);
+
+    @Test
+    public void testLaggedDelete() throws Exception {
+        OffloadPrefixTest.MockLedgerOffloader offloader = new 
OffloadPrefixTest.MockLedgerOffloader();
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        MockClock clock = new MockClock();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setOffloadLedgerDeletionLag(5, TimeUnit.MINUTES);
+        config.setLedgerOffloader(offloader);
+        config.setClock(clock);
+
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        int i = 0;
+        for (; i < 15; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        long firstLedgerId = 
ledger.getLedgersInfoAsList().get(0).getLedgerId();
+
+        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+                            .filter(e -> e.getOffloadContext().getComplete())
+                            .map(e -> 
e.getLedgerId()).collect(Collectors.toSet()),
+                            offloader.offloadedLedgers());
+        Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId));
+
+        clock.advance(2, TimeUnit.MINUTES);
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+        Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId));
+
+        clock.advance(5, TimeUnit.MINUTES);
+        CompletableFuture<Void> promise2 = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise2);
+        promise2.join();
+
+        // assert bk ledger is deleted
+        assertEventuallyTrue(() -> !bkc.getLedgers().contains(firstLedgerId));
+
+        // ledger still exists in list
+        Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+                            .filter(e -> e.getOffloadContext().getComplete())
+                            .map(e -> 
e.getLedgerId()).collect(Collectors.toSet()),
+                            offloader.offloadedLedgers());
+
+        // move past retention, should be deleted from offloaded also
+        clock.advance(5, TimeUnit.MINUTES);
+        CompletableFuture<Void> promise3 = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise3);
+        promise3.join();
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
+        assertEventuallyTrue(() -> 
offloader.deletedOffloads().contains(firstLedgerId));
+    }
+
+    @Test
+    public void testLaggedDeleteRetentionSetLower() throws Exception {
+        OffloadPrefixTest.MockLedgerOffloader offloader = new 
OffloadPrefixTest.MockLedgerOffloader();
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        MockClock clock = new MockClock();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(5, TimeUnit.MINUTES);
+        config.setOffloadLedgerDeletionLag(10, TimeUnit.MINUTES);
+        config.setLedgerOffloader(offloader);
+        config.setClock(clock);
+
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        int i = 0;
+        for (; i < 15; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        long firstLedgerId = 
ledger.getLedgersInfoAsList().get(0).getLedgerId();
+
+        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+                            .filter(e -> e.getOffloadContext().getComplete())
+                            .map(e -> 
e.getLedgerId()).collect(Collectors.toSet()),
+                            offloader.offloadedLedgers());
+        Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId));
+
+        clock.advance(2, TimeUnit.MINUTES);
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+        Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId));
+
+        clock.advance(5, TimeUnit.MINUTES);
+        CompletableFuture<Void> promise2 = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise2);
+        promise2.join();
+
+        // ensure it gets deleted from both bookkeeper and offloader
+        assertEventuallyTrue(() -> !bkc.getLedgers().contains(firstLedgerId));
+        assertEventuallyTrue(() -> 
offloader.deletedOffloads().contains(firstLedgerId));
+    }
+}
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 311a82e..278182c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -633,7 +633,7 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
         assertEventuallyTrue(() -> 
offloader.deletedOffloads().contains(firstLedger));
     }
 
-    void assertEventuallyTrue(BooleanSupplier predicate) throws Exception {
+    static void assertEventuallyTrue(BooleanSupplier predicate) throws 
Exception {
         // wait up to 3 seconds
         for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {
             Thread.sleep(100);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/MockClock.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/MockClock.java
new file mode 100644
index 0000000..b74bec3
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/MockClock.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MockClock extends Clock {
+    private Instant initialInstant = Clock.systemUTC().instant();
+    private AtomicLong delta = new AtomicLong(0);
+
+    public void advance(long period, TimeUnit unit) {
+        delta.addAndGet(unit.toNanos(period));
+    }
+
+    @Override
+    public Instant instant() {
+        return initialInstant.plusNanos(delta.get());
+    }
+
+    @Override
+    public Clock withZone(ZoneId zone) {
+        return this;
+    }
+
+    @Override
+    public ZoneId getZone() {
+        return ZoneId.systemDefault();
+    }
+}
+

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to