This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 03ab0d7 BookKeeper ledgers cleaned up after offload (#1668) 03ab0d7 is described below commit 03ab0d78706c4b310b7b4fdfd632346463c0e928 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Sat Apr 28 01:21:00 2018 +0200 BookKeeper ledgers cleaned up after offload (#1668) This patch adds a new configuration for managed ledger, offload ledger deletion lag, which is the amount of time to wait after a ledger has been offloaded to long term storage before the ledger will be deleted from bookkeeper. Defaults to 4 hours. Master Issue: #1511 --- .../bookkeeper/mledger/ManagedLedgerConfig.java | 23 +++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 89 ++++++++--- .../bookkeeper/mledger/proto/MLDataFormats.java | 162 ++++++++++++++++++--- .../apache/bookkeeper/mledger/util/Futures.java | 2 + managed-ledger/src/main/proto/MLDataFormats.proto | 2 + .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 3 +- .../mledger/impl/OffloadLedgerDeleteTest.java | 148 +++++++++++++++++++ .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 2 +- .../apache/bookkeeper/mledger/util/MockClock.java | 50 +++++++ 9 files changed, 433 insertions(+), 48 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index f3add7c..566701c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -54,6 +54,7 @@ public class ManagedLedgerConfig { private long retentionTimeMs = 0; private long retentionSizeInMB = 0; private boolean autoSkipNonRecoverableData; + private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4); private DigestType digestType = DigestType.CRC32C; private byte[] password = "".getBytes(Charsets.UTF_8); @@ -387,6 +388,28 @@ public class ManagedLedgerConfig { } /** + * When a ledger is offloaded from bookkeeper storage to longterm storage, the bookkeeper ledger + * is not deleted immediately. Instead we wait for a grace period before deleting from bookkeeper. + * The offloadLedgerDeleteLag sets this grace period. + * + * @param lagTime period to wait before deleting offloaded ledgers from bookkeeper + * @param unit timeunit for lagTime + */ + public ManagedLedgerConfig setOffloadLedgerDeletionLag(int lagTime, TimeUnit unit) { + this.offloadLedgerDeletionLagMs = unit.toMillis(lagTime); + return this; + } + + /** + * Number of milliseconds before an offloaded ledger will be deleted from bookkeeper. + * + * @return the offload ledger deletion lag time in milliseconds + */ + public long getOffloadLedgerDeletionLagMillis() { + return offloadLedgerDeletionLagMs; + } + + /** * Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets * corrupted at bookkeeper and managed-cursor is stuck at that ledger. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a3ec2e5..f1d7100 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -97,6 +97,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.Stat; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo; import org.apache.bookkeeper.mledger.util.CallbackMutex; @@ -1543,13 +1544,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } private void trimConsumedLedgersInBackground() { + trimConsumedLedgersInBackground(Futures.NULL_PROMISE); + } + + private void trimConsumedLedgersInBackground(CompletableFuture<?> promise) { executor.executeOrdered(name, safeRun(() -> { - internalTrimConsumedLedgers(); + internalTrimConsumedLedgers(promise); })); } - private void scheduleDeferredTrimming() { - scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground()), 100, TimeUnit.MILLISECONDS); + private void scheduleDeferredTrimming(CompletableFuture<?> promise) { + scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(promise)), 100, TimeUnit.MILLISECONDS); } private boolean hasLedgerRetentionExpired(long ledgerTimestamp) { @@ -1568,20 +1573,27 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { && TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024; } + private boolean isOffloadedNeedsDelete(OffloadContext offload) { + long elapsedMs = clock.millis() - offload.getTimestamp(); + return offload.getComplete() + && !offload.getBookkeeperDeleted() + && elapsedMs > config.getOffloadLedgerDeletionLagMillis(); + } + /** * Checks whether there are ledger that have been fully consumed and deletes them. * * @throws Exception */ - void internalTrimConsumedLedgers() { + void internalTrimConsumedLedgers(CompletableFuture<?> promise) { // Ensure only one trimming operation is active if (!trimmerMutex.tryLock()) { - scheduleDeferredTrimming(); + scheduleDeferredTrimming(promise); return; } List<LedgerInfo> ledgersToDelete = Lists.newArrayList(); - + List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList(); synchronized (this) { if (log.isDebugEnabled()) { log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(), @@ -1590,6 +1602,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (STATE_UPDATER.get(this) == State.Closed) { log.debug("[{}] Ignoring trimming request since the managed ledger was already closed", name); trimmerMutex.unlock(); + promise.completeExceptionally(new ManagedLedgerAlreadyClosedException("Can't trim closed ledger")); return; } @@ -1604,6 +1617,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (slowestReaderPosition != null) { slowestReaderLedgerId = slowestReaderPosition.getLedgerId(); } else { + promise.completeExceptionally(new ManagedLedgerException("Couldn't find reader position")); trimmerMutex.unlock(); return; } @@ -1625,43 +1639,55 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired, overRetentionQuota, currentLedger.getId()); } - if (ls.getLedgerId() == currentLedger.getId() || (!expired && !overRetentionQuota)) { - if (log.isDebugEnabled()) { - if (!expired) { - log.debug("[{}] ledger id skipped for deletion as unexpired: {}", name, ls.getLedgerId()); - } - if (!overRetentionQuota) { - log.debug("[{}] ledger id: {} skipped for deletion as size: {} under quota: {} MB", name, - ls.getLedgerId(), TOTAL_SIZE_UPDATER.get(this), config.getRetentionSizeInMB()); - } - } + if (ls.getLedgerId() == currentLedger.getId()) { + log.debug("[{}] ledger id skipped for deletion as it is currently being written to", + name, ls.getLedgerId()); + break; + } else if (expired) { + log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp()); + ledgersToDelete.add(ls); + } else if (overRetentionQuota) { + log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId()); + ledgersToDelete.add(ls); + } else if (isOffloadedNeedsDelete(ls.getOffloadContext())) { + log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", + name, ls.getLedgerId()); + offloadedLedgersToDelete.add(ls); + } else { + log.debug("[{}] Nothing done for ledger {}. Neither expired, over-quota nor offloaded", + name, ls.getLedgerId()); break; } - - ledgersToDelete.add(ls); - ledgerCache.remove(ls.getLedgerId()); } - if (ledgersToDelete.isEmpty()) { + if (ledgersToDelete.isEmpty() && offloadedLedgersToDelete.isEmpty()) { trimmerMutex.unlock(); + promise.complete(null); return; } if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming || !ledgersListMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list - scheduleDeferredTrimming(); + scheduleDeferredTrimming(promise); trimmerMutex.unlock(); return; } // Update metadata for (LedgerInfo ls : ledgersToDelete) { + ledgerCache.remove(ls.getLedgerId()); + ledgers.remove(ls.getLedgerId()); NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries()); TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize()); entryCache.invalidateAllEntries(ls.getLedgerId()); } + for (LedgerInfo ls : offloadedLedgersToDelete) { + LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); + newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true); + ledgers.put(ls.getLedgerId(), newInfoBuilder.build()); + } if (log.isDebugEnabled()) { log.debug("[{}] Updating of ledgers list after trimming", name); @@ -1680,6 +1706,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); asyncDeleteLedger(ls.getLedgerId(), ls); } + for (LedgerInfo ls : offloadedLedgersToDelete) { + log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", + name, ls.getLedgerId(), ls.getSize()); + asyncDeleteLedgerFromBookKeeper(ls.getLedgerId()); + } + promise.complete(null); } @Override @@ -1687,6 +1719,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.warn("[{}] Failed to update the list of ledgers after trimming", name, e); ledgersListMutex.unlock(); trimmerMutex.unlock(); + + promise.completeExceptionally(e); } }); } @@ -1770,8 +1804,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } - private void asyncDeleteLedger(long ledgerId, LedgerInfo info) { + private void asyncDeleteLedgerFromBookKeeper(long ledgerId) { asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); + } + + private void asyncDeleteLedger(long ledgerId, LedgerInfo info) { + if (!info.getOffloadContext().getBookkeeperDeleted()) { + // only delete if it hasn't been previously deleted for offload + asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); + } if (info.getOffloadContext().hasUidMsb()) { UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), @@ -2084,7 +2125,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { }) .whenComplete((result, exception) -> { if (exception != null) { - log.warn("[{}] Failed to prepare ledger {} for offload, uuid {}", name, ledgerId, uuid); + log.warn("[{}] Failed to prepare ledger {} for offload, uuid {}", + name, ledgerId, uuid, exception); } else { log.info("[{}] Metadata prepared for offload of ledger {} with uuid {}", name, ledgerId, uuid); } @@ -2100,6 +2142,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (existingUuid.equals(uuid)) { LedgerInfo.Builder builder = oldInfo.toBuilder(); builder.getOffloadContextBuilder() + .setTimestamp(clock.millis()) .setComplete(true); return builder.build(); } else { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java index 1f0fae4..73ba1da 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java @@ -22,6 +22,14 @@ public final class MLDataFormats { // optional bool complete = 3; boolean hasComplete(); boolean getComplete(); + + // optional bool bookkeeperDeleted = 4; + boolean hasBookkeeperDeleted(); + boolean getBookkeeperDeleted(); + + // optional int64 timestamp = 5; + boolean hasTimestamp(); + long getTimestamp(); } public static final class OffloadContext extends com.google.protobuf.GeneratedMessage @@ -82,10 +90,32 @@ public final class MLDataFormats { return complete_; } + // optional bool bookkeeperDeleted = 4; + public static final int BOOKKEEPERDELETED_FIELD_NUMBER = 4; + private boolean bookkeeperDeleted_; + public boolean hasBookkeeperDeleted() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public boolean getBookkeeperDeleted() { + return bookkeeperDeleted_; + } + + // optional int64 timestamp = 5; + public static final int TIMESTAMP_FIELD_NUMBER = 5; + private long timestamp_; + public boolean hasTimestamp() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public long getTimestamp() { + return timestamp_; + } + private void initFields() { uidMsb_ = 0L; uidLsb_ = 0L; complete_ = false; + bookkeeperDeleted_ = false; + timestamp_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -108,6 +138,12 @@ public final class MLDataFormats { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBool(3, complete_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBool(4, bookkeeperDeleted_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeInt64(5, timestamp_); + } getUnknownFields().writeTo(output); } @@ -129,6 +165,14 @@ public final class MLDataFormats { size += com.google.protobuf.CodedOutputStream .computeBoolSize(3, complete_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(4, bookkeeperDeleted_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(5, timestamp_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -259,6 +303,10 @@ public final class MLDataFormats { bitField0_ = (bitField0_ & ~0x00000002); complete_ = false; bitField0_ = (bitField0_ & ~0x00000004); + bookkeeperDeleted_ = false; + bitField0_ = (bitField0_ & ~0x00000008); + timestamp_ = 0L; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -309,6 +357,14 @@ public final class MLDataFormats { to_bitField0_ |= 0x00000004; } result.complete_ = complete_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.bookkeeperDeleted_ = bookkeeperDeleted_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.timestamp_ = timestamp_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -334,6 +390,12 @@ public final class MLDataFormats { if (other.hasComplete()) { setComplete(other.getComplete()); } + if (other.hasBookkeeperDeleted()) { + setBookkeeperDeleted(other.getBookkeeperDeleted()); + } + if (other.hasTimestamp()) { + setTimestamp(other.getTimestamp()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -380,6 +442,16 @@ public final class MLDataFormats { complete_ = input.readBool(); break; } + case 32: { + bitField0_ |= 0x00000008; + bookkeeperDeleted_ = input.readBool(); + break; + } + case 40: { + bitField0_ |= 0x00000010; + timestamp_ = input.readInt64(); + break; + } } } } @@ -449,6 +521,48 @@ public final class MLDataFormats { return this; } + // optional bool bookkeeperDeleted = 4; + private boolean bookkeeperDeleted_ ; + public boolean hasBookkeeperDeleted() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public boolean getBookkeeperDeleted() { + return bookkeeperDeleted_; + } + public Builder setBookkeeperDeleted(boolean value) { + bitField0_ |= 0x00000008; + bookkeeperDeleted_ = value; + onChanged(); + return this; + } + public Builder clearBookkeeperDeleted() { + bitField0_ = (bitField0_ & ~0x00000008); + bookkeeperDeleted_ = false; + onChanged(); + return this; + } + + // optional int64 timestamp = 5; + private long timestamp_ ; + public boolean hasTimestamp() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public long getTimestamp() { + return timestamp_; + } + public Builder setTimestamp(long value) { + bitField0_ |= 0x00000010; + timestamp_ = value; + onChanged(); + return this; + } + public Builder clearTimestamp() { + bitField0_ = (bitField0_ & ~0x00000010); + timestamp_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:OffloadContext) } @@ -5359,29 +5473,31 @@ public final class MLDataFormats { descriptor; static { java.lang.String[] descriptorData = { - "\n\"src/main/proto/MLDataFormats.proto\"B\n\016" + + "\n\"src/main/proto/MLDataFormats.proto\"p\n\016" + "OffloadContext\022\016\n\006uidMsb\030\001 \001(\003\022\016\n\006uidLsb" + - "\030\002 \001(\003\022\020\n\010complete\030\003 \001(\010\"\362\001\n\021ManagedLedg" + - "erInfo\0221\n\nledgerInfo\030\001 \003(\0132\035.ManagedLedg" + - "erInfo.LedgerInfo\022/\n\022terminatedPosition\030" + - "\002 \001(\0132\023.NestedPositionInfo\032y\n\nLedgerInfo" + - "\022\020\n\010ledgerId\030\001 \002(\003\022\017\n\007entries\030\002 \001(\003\022\014\n\004s" + - "ize\030\003 \001(\003\022\021\n\ttimestamp\030\004 \001(\003\022\'\n\016offloadC" + - "ontext\030\005 \001(\0132\017.OffloadContext\"\206\001\n\014Positi" + - "onInfo\022\020\n\010ledgerId\030\001 \002(\003\022\017\n\007entryId\030\002 \002(", - "\003\0220\n\031individualDeletedMessages\030\003 \003(\0132\r.M" + - "essageRange\022!\n\nproperties\030\004 \003(\0132\r.LongPr" + - "operty\"7\n\022NestedPositionInfo\022\020\n\010ledgerId" + - "\030\001 \002(\003\022\017\n\007entryId\030\002 \002(\003\"f\n\014MessageRange\022" + - "*\n\rlowerEndpoint\030\001 \002(\0132\023.NestedPositionI" + - "nfo\022*\n\rupperEndpoint\030\002 \002(\0132\023.NestedPosit" + - "ionInfo\"+\n\014LongProperty\022\014\n\004name\030\001 \002(\t\022\r\n" + - "\005value\030\002 \002(\003\"\270\001\n\021ManagedCursorInfo\022\027\n\017cu" + - "rsorsLedgerId\030\001 \002(\003\022\032\n\022markDeleteLedgerI" + - "d\030\002 \001(\003\022\031\n\021markDeleteEntryId\030\003 \001(\003\0220\n\031in", - "dividualDeletedMessages\030\004 \003(\0132\r.MessageR" + - "ange\022!\n\nproperties\030\005 \003(\0132\r.LongPropertyB" + - "\'\n#org.apache.bookkeeper.mledger.protoH\001" + "\030\002 \001(\003\022\020\n\010complete\030\003 \001(\010\022\031\n\021bookkeeperDe" + + "leted\030\004 \001(\010\022\021\n\ttimestamp\030\005 \001(\003\"\362\001\n\021Manag" + + "edLedgerInfo\0221\n\nledgerInfo\030\001 \003(\0132\035.Manag" + + "edLedgerInfo.LedgerInfo\022/\n\022terminatedPos" + + "ition\030\002 \001(\0132\023.NestedPositionInfo\032y\n\nLedg" + + "erInfo\022\020\n\010ledgerId\030\001 \002(\003\022\017\n\007entries\030\002 \001(" + + "\003\022\014\n\004size\030\003 \001(\003\022\021\n\ttimestamp\030\004 \001(\003\022\'\n\016of" + + "floadContext\030\005 \001(\0132\017.OffloadContext\"\206\001\n\014", + "PositionInfo\022\020\n\010ledgerId\030\001 \002(\003\022\017\n\007entryI" + + "d\030\002 \002(\003\0220\n\031individualDeletedMessages\030\003 \003" + + "(\0132\r.MessageRange\022!\n\nproperties\030\004 \003(\0132\r." + + "LongProperty\"7\n\022NestedPositionInfo\022\020\n\010le" + + "dgerId\030\001 \002(\003\022\017\n\007entryId\030\002 \002(\003\"f\n\014Message" + + "Range\022*\n\rlowerEndpoint\030\001 \002(\0132\023.NestedPos" + + "itionInfo\022*\n\rupperEndpoint\030\002 \002(\0132\023.Neste" + + "dPositionInfo\"+\n\014LongProperty\022\014\n\004name\030\001 " + + "\002(\t\022\r\n\005value\030\002 \002(\003\"\270\001\n\021ManagedCursorInfo" + + "\022\027\n\017cursorsLedgerId\030\001 \002(\003\022\032\n\022markDeleteL", + "edgerId\030\002 \001(\003\022\031\n\021markDeleteEntryId\030\003 \001(\003" + + "\0220\n\031individualDeletedMessages\030\004 \003(\0132\r.Me" + + "ssageRange\022!\n\nproperties\030\005 \003(\0132\r.LongPro" + + "pertyB\'\n#org.apache.bookkeeper.mledger.p" + + "rotoH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5393,7 +5509,7 @@ public final class MLDataFormats { internal_static_OffloadContext_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_OffloadContext_descriptor, - new java.lang.String[] { "UidMsb", "UidLsb", "Complete", }, + new java.lang.String[] { "UidMsb", "UidLsb", "Complete", "BookkeeperDeleted", "Timestamp", }, org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext.class, org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext.Builder.class); internal_static_ManagedLedgerInfo_descriptor = diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java index 11801bb..9fd4a07 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java @@ -29,6 +29,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; */ public class Futures { + public static CompletableFuture<Void> NULL_PROMISE = CompletableFuture.completedFuture(null); + /** * Adapts a {@link CloseCallback} to a {@link CompletableFuture}. */ diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index 99e26e3..29f465c 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -25,6 +25,8 @@ message OffloadContext { optional int64 uidMsb = 1; optional int64 uidLsb = 2; optional bool complete = 3; + optional bool bookkeeperDeleted = 4; + optional int64 timestamp = 5; } message ManagedLedgerInfo { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 60d66a1..d7cce97 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; @@ -1670,7 +1671,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { c1.skipEntries(1, IndividualDeletedEntries.Exclude); // let retention expire Thread.sleep(1000); - ml.internalTrimConsumedLedgers(); + ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null)); assertTrue(ml.getLedgersInfoAsList().size() <= 1); assertTrue(ml.getTotalSize() <= "shortmessage".getBytes().length); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java new file mode 100644 index 0000000..41a7ad0 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.util.MockClock; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase { + private static final Logger log = LoggerFactory.getLogger(OffloadLedgerDeleteTest.class); + + @Test + public void testLaggedDelete() throws Exception { + OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader(); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + MockClock clock = new MockClock(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setOffloadLedgerDeletionLag(5, TimeUnit.MINUTES); + config.setLedgerOffloader(offloader); + config.setClock(clock); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + int i = 0; + for (; i < 15; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + long firstLedgerId = ledger.getLedgersInfoAsList().get(0).getLedgerId(); + + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()) + .map(e -> e.getLedgerId()).collect(Collectors.toSet()), + offloader.offloadedLedgers()); + Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId)); + + clock.advance(2, TimeUnit.MINUTES); + CompletableFuture<Void> promise = new CompletableFuture<>(); + ledger.internalTrimConsumedLedgers(promise); + promise.join(); + Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId)); + + clock.advance(5, TimeUnit.MINUTES); + CompletableFuture<Void> promise2 = new CompletableFuture<>(); + ledger.internalTrimConsumedLedgers(promise2); + promise2.join(); + + // assert bk ledger is deleted + assertEventuallyTrue(() -> !bkc.getLedgers().contains(firstLedgerId)); + + // ledger still exists in list + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()) + .map(e -> e.getLedgerId()).collect(Collectors.toSet()), + offloader.offloadedLedgers()); + + // move past retention, should be deleted from offloaded also + clock.advance(5, TimeUnit.MINUTES); + CompletableFuture<Void> promise3 = new CompletableFuture<>(); + ledger.internalTrimConsumedLedgers(promise3); + promise3.join(); + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1); + assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId)); + } + + @Test + public void testLaggedDeleteRetentionSetLower() throws Exception { + OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader(); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + MockClock clock = new MockClock(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(5, TimeUnit.MINUTES); + config.setOffloadLedgerDeletionLag(10, TimeUnit.MINUTES); + config.setLedgerOffloader(offloader); + config.setClock(clock); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + int i = 0; + for (; i < 15; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + long firstLedgerId = ledger.getLedgersInfoAsList().get(0).getLedgerId(); + + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()) + .map(e -> e.getLedgerId()).collect(Collectors.toSet()), + offloader.offloadedLedgers()); + Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId)); + + clock.advance(2, TimeUnit.MINUTES); + CompletableFuture<Void> promise = new CompletableFuture<>(); + ledger.internalTrimConsumedLedgers(promise); + promise.join(); + Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId)); + + clock.advance(5, TimeUnit.MINUTES); + CompletableFuture<Void> promise2 = new CompletableFuture<>(); + ledger.internalTrimConsumedLedgers(promise2); + promise2.join(); + + // ensure it gets deleted from both bookkeeper and offloader + assertEventuallyTrue(() -> !bkc.getLedgers().contains(firstLedgerId)); + assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId)); + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 311a82e..278182c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -633,7 +633,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger)); } - void assertEventuallyTrue(BooleanSupplier predicate) throws Exception { + static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception { // wait up to 3 seconds for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) { Thread.sleep(100); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/MockClock.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/MockClock.java new file mode 100644 index 0000000..b74bec3 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/MockClock.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class MockClock extends Clock { + private Instant initialInstant = Clock.systemUTC().instant(); + private AtomicLong delta = new AtomicLong(0); + + public void advance(long period, TimeUnit unit) { + delta.addAndGet(unit.toNanos(period)); + } + + @Override + public Instant instant() { + return initialInstant.plusNanos(delta.get()); + } + + @Override + public Clock withZone(ZoneId zone) { + return this; + } + + @Override + public ZoneId getZone() { + return ZoneId.systemDefault(); + } +} + -- To stop receiving notification emails like this one, please contact si...@apache.org.