tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991831479
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean
isTruncate, CompletableFuture<?> p
}
private void maybeOffloadInBackground(CompletableFuture<PositionImpl>
promise) {
- if (config.getLedgerOffloader() != null
- && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
- && config.getLedgerOffloader().getOffloadPolicies() != null
- &&
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
!= null
- &&
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
>= 0) {
+ if (config.getLedgerOffloader() == null || config.getLedgerOffloader()
== NullLedgerOffloader.INSTANCE
+ || config.getLedgerOffloader().getOffloadPolicies() == null) {
+ return;
+ }
+
+ final OffloadPoliciesImpl policies =
config.getLedgerOffloader().getOffloadPolicies();
+ final long offloadThresholdInBytes =
+
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+ final long offloadThresholdInSeconds =
+
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+ if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
executor.executeOrdered(name, safeRun(() ->
maybeOffload(promise)));
}
}
private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
if (!offloadMutex.tryLock()) {
scheduledExecutor.schedule(safeRun(() ->
maybeOffloadInBackground(finalPromise)),
- 100, TimeUnit.MILLISECONDS);
- } else {
- CompletableFuture<PositionImpl> unlockingPromise = new
CompletableFuture<>();
- unlockingPromise.whenComplete((res, ex) -> {
- offloadMutex.unlock();
- if (ex != null) {
- finalPromise.completeExceptionally(ex);
- } else {
- finalPromise.complete(res);
- }
- });
+ 100, TimeUnit.MILLISECONDS);
+ return;
+ }
- if (config.getLedgerOffloader() != null
- && config.getLedgerOffloader() !=
NullLedgerOffloader.INSTANCE
- && config.getLedgerOffloader().getOffloadPolicies() != null
- &&
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
- != null) {
- long threshold =
config.getLedgerOffloader().getOffloadPolicies()
- .getManagedLedgerOffloadThresholdInBytes();
-
- long sizeSummed = 0;
- long alreadyOffloadedSize = 0;
- long toOffloadSize = 0;
-
- ConcurrentLinkedDeque<LedgerInfo> toOffload = new
ConcurrentLinkedDeque<>();
-
- // go through ledger list from newest to oldest and build a
list to offload in oldest to newest order
- for (Map.Entry<Long, LedgerInfo> e :
ledgers.descendingMap().entrySet()) {
- long size = e.getValue().getSize();
- sizeSummed += size;
- boolean alreadyOffloaded = e.getValue().hasOffloadContext()
- && e.getValue().getOffloadContext().getComplete();
- if (alreadyOffloaded) {
- alreadyOffloadedSize += size;
- } else if (sizeSummed > threshold) {
- toOffloadSize += size;
- toOffload.addFirst(e.getValue());
- }
- }
+ CompletableFuture<PositionImpl> unlockingPromise = new
CompletableFuture<>();
+ unlockingPromise.whenComplete((res, ex) -> {
+ offloadMutex.unlock();
+ if (ex != null) {
+ finalPromise.completeExceptionally(ex);
+ } else {
+ finalPromise.complete(res);
+ }
+ });
- if (toOffload.size() > 0) {
- log.info("[{}] Going to automatically offload ledgers {}"
- + ", total size = {}, already offloaded =
{}, to offload = {}",
- name,
toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
- sizeSummed, alreadyOffloadedSize, toOffloadSize);
- offloadLoop(unlockingPromise, toOffload,
PositionImpl.LATEST, Optional.empty());
- } else {
- // offloadLoop will complete immediately with an empty
list to offload
- log.debug("[{}] Nothing to offload, total size = {},
already offloaded = {}, threshold = {}",
- name, sizeSummed, alreadyOffloadedSize, threshold);
- unlockingPromise.complete(PositionImpl.LATEST);
+ if (config.getLedgerOffloader() == null || config.getLedgerOffloader()
== NullLedgerOffloader.INSTANCE
+ || config.getLedgerOffloader().getOffloadPolicies() == null) {
+ log.debug("[{}] Nothing to offload due to offloader or
offloadPolicies is NULL", name);
+ finalPromise.complete(PositionImpl.LATEST);
+ return;
+ }
+
+ long sizeSummed = 0;
+ long toOffloadSize = 0;
+ long alreadyOffloadedSize = 0;
+ ConcurrentLinkedDeque<LedgerInfo> toOffload = new
ConcurrentLinkedDeque<>();
+
+ final OffloadPoliciesImpl policies =
config.getLedgerOffloader().getOffloadPolicies();
+ final long offloadThresholdInBytes =
+
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+ final long offloadTimeThresholdMillis =
+
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v
-> v >= 0)
+ .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+ //Skip the following steps if `offloadTimeThreshold` and
`offloadThresholdInBytes` are null OR negative values.
+ if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+ log.debug("[{}] Nothing to offload due to
[managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+ + "and [managedLedgerOffloadThresholdInSeconds] are null
OR negative values.", name);
+ unlockingPromise.complete(PositionImpl.LATEST);
+ return;
+ }
+
+ for (Map.Entry<Long, LedgerInfo> e :
ledgers.descendingMap().entrySet()) {
+ final LedgerInfo info = e.getValue();
+ // Skip current active ledger, an active ledger can't be offloaded.
+ // Can't `info.getLedgerId() == currentLedger.getId()` here,
trigger offloading is before create ledger.
+ if (info.getTimestamp() == 0L) {
+ continue;
+ }
+
+ final long size = info.getSize();
+ final long timestamp = info.getTimestamp();
+ final long now = System.currentTimeMillis();
+ sizeSummed += size;
+
+ final boolean alreadyOffloaded = info.hasOffloadContext() &&
info.getOffloadContext().getComplete();
+ if (alreadyOffloaded) {
+ alreadyOffloadedSize += size;
+ } else {
+ if ((offloadThresholdInBytes >= 0 && sizeSummed >
offloadThresholdInBytes)
Review Comment:
I also confused here, because `sizeSummed` also includes the ledger which
already offloaded. Maybe we need to fix it with another PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]