This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 051c676dc0d48bf4d5a99077cccf9cf87e23cdee Author: Enrico Olivelli <[email protected]> AuthorDate: Mon Jun 2 09:41:46 2025 +0200 [improve][offloaders] Automatically evict Offloaded Ledgers from memory (#19783) Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit a1a2b363cfaa1bbc38933a742484a70a0a56e761) --- conf/broker.conf | 4 + .../bookkeeper/mledger/ManagedLedgerConfig.java | 9 ++ .../bookkeeper/mledger/OffloadedLedgerHandle.java | 29 ++++++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 74 ++++++++++++++- .../impl/OffloadEvictUnusedLedgersTest.java | 104 +++++++++++++++++++++ .../mledger/impl/OffloadPrefixReadTest.java | 20 +++- .../apache/pulsar/broker/ServiceConfiguration.java | 8 ++ .../pulsar/broker/service/BrokerService.java | 4 + .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 17 +++- 9 files changed, 261 insertions(+), 8 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 0a8eddbb3c7..b60870f109f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1212,6 +1212,10 @@ managedLedgerMaxLedgerRolloverTimeMinutes=240 # Disable rollover with value 0 (Default value 0) managedLedgerInactiveLedgerRolloverTimeSeconds=0 +# Time to evict inactive offloaded ledger for inactive topic +# Disable eviction with value 0 +managedLedgerInactiveOffloadedLedgerEvictionTimeSeconds=600 + # Maximum ledger size before triggering a rollover for a topic (MB) managedLedgerMaxSizePerLedgerMbytes=2048 diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 721654b0529..89cc7e4fde4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -79,6 +79,7 @@ public class ManagedLedgerConfig { private ManagedLedgerInterceptor managedLedgerInterceptor; private Map<String, String> properties; private int inactiveLedgerRollOverTimeMs = 0; + private long inactiveOffloadedLedgerEvictionTimeMs = 0; @Getter @Setter private boolean cacheEvictionByMarkDeletedPosition = false; @@ -701,6 +702,14 @@ public class ManagedLedgerConfig { this.inactiveLedgerRollOverTimeMs = (int) unit.toMillis(inactiveLedgerRollOverTimeMs); } + public long getInactiveOffloadedLedgerEvictionTimeMs() { + return inactiveOffloadedLedgerEvictionTimeMs; + } + + public void setInactiveOffloadedLedgerEvictionTime(long inactiveOffloadedLedgerEvictionTime, TimeUnit unit) { + this.inactiveOffloadedLedgerEvictionTimeMs = unit.toMillis(inactiveOffloadedLedgerEvictionTime); + } + /** * Minimum cursors with backlog after which broker is allowed to cache read entries to reuse them for other cursors' * backlog reads. (Default = 0, broker will not cache backlog reads) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java new file mode 100644 index 00000000000..f45d115090f --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +/** + * This is a marked interface for ledger handle that represent offloaded data. + */ +public interface OffloadedLedgerHandle { + + default long lastAccessTimestamp() { + return -1; + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ede725c1a01..b2065476c3b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -112,6 +112,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundExce import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; +import org.apache.bookkeeper.mledger.OffloadedLedgerHandle; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.WaitingEntryCallBack; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; @@ -335,6 +336,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @VisibleForTesting Map<String, byte[]> createdLedgerCustomMetadata; + private long lastEvictOffloadedLedgers; + private static final int MINIMUM_EVICTION_INTERVAL_DIVIDER = 10; + public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, final String name) { @@ -1979,6 +1983,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // TODO: improve this to load ledger offloader by driver name recorded in metadata Map<String, String> offloadDriverMetadata = OffloadUtils.getOffloadDriverMetadata(info); offloadDriverMetadata.put("ManagedLedgerName", name); + log.info("[{}] Opening ledger {} from offload driver {} with uid {}", name, ledgerId, + config.getLedgerOffloader().getOffloadDriverName(), uid); openFuture = config.getLedgerOffloader().readOffloaded(ledgerId, uid, offloadDriverMetadata); } else { @@ -2004,11 +2010,20 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { void invalidateReadHandle(long ledgerId) { CompletableFuture<ReadHandle> rhf = ledgerCache.remove(ledgerId); if (rhf != null) { - rhf.thenAccept(ReadHandle::closeAsync) - .exceptionally(ex -> { - log.warn("[{}] Failed to close a Ledger ReadHandle:", name, ex); - return null; - }); + rhf.thenCompose(r -> { + if (r instanceof OffloadedLedgerHandle) { + log.info("[{}] Closing ledger {} from offload driver {}", name, ledgerId, + config.getLedgerOffloader().getOffloadDriverName()); + } + return r.closeAsync().exceptionally(ex -> { + log.warn("[{}] Failed to close ledger {} ReadHandle with type {}", name, ledgerId, + r.getClass().getName(), ex); + return null; + }); + }).exceptionally(ex -> { + log.warn("[{}] Failed to close Ledger ReadHandle {}:", name, ledgerId, ex); + return null; + }); } } @@ -2623,7 +2638,56 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return Optional.ofNullable(ledgerOffloader.getOffloadPolicies()); } + @VisibleForTesting + synchronized List<Long> internalEvictOffloadedLedgers() { + long inactiveOffloadedLedgerEvictionTimeMs = config.getInactiveOffloadedLedgerEvictionTimeMs(); + if (inactiveOffloadedLedgerEvictionTimeMs <= 0) { + return Collections.emptyList(); + } + + long now = clock.millis(); + long minimumEvictionIntervalMs = inactiveOffloadedLedgerEvictionTimeMs / MINIMUM_EVICTION_INTERVAL_DIVIDER; + if (now - lastEvictOffloadedLedgers < minimumEvictionIntervalMs) { + // skip eviction if we have done it recently + return Collections.emptyList(); + } + + try { + List<Long> ledgersToRelease = new ArrayList<>(); + + ledgerCache.forEach((ledgerId, ledger) -> { + if (ledger.isDone() && !ledger.isCompletedExceptionally()) { + ReadHandle readHandle = ledger.join(); + if (readHandle instanceof OffloadedLedgerHandle) { + long lastAccessTimestamp = ((OffloadedLedgerHandle) readHandle).lastAccessTimestamp(); + if (lastAccessTimestamp >= 0) { + long delta = now - lastAccessTimestamp; + if (delta >= inactiveOffloadedLedgerEvictionTimeMs) { + log.info("[{}] Offloaded ledger {} can be released ({} ms elapsed since last access)", + name, ledgerId, delta); + ledgersToRelease.add(ledgerId); + } else if (log.isDebugEnabled()) { + log.debug( + "[{}] Offloaded ledger {} cannot be released ({} ms elapsed since last access)", + name, ledgerId, delta); + } + } + } + } + }); + for (Long ledgerId : ledgersToRelease) { + invalidateReadHandle(ledgerId); + } + return ledgersToRelease; + } finally { + lastEvictOffloadedLedgers = now; + } + } + void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) { + + internalEvictOffloadedLedgers(); + if (!factory.isMetadataServiceAvailable()) { // Defer trimming of ledger if we cannot connect to metadata service promise.completeExceptionally(new MetaStoreException("Metadata service is not available")); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadEvictUnusedLedgersTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadEvictUnusedLedgersTest.java new file mode 100644 index 00000000000..d4fce5585e3 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadEvictUnusedLedgersTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.awaitility.Awaitility; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +public class OffloadEvictUnusedLedgersTest extends MockedBookKeeperTestCase { + private static final Logger log = LoggerFactory.getLogger(OffloadEvictUnusedLedgersTest.class); + + @Test + public void testEvictUnusedLedgers() throws Exception { + OffloadPrefixReadTest.MockLedgerOffloader offloader = + new OffloadPrefixReadTest.MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + long inactiveOffloadedLedgerEvictionTimeMs = 1000; + config.setInactiveOffloadedLedgerEvictionTime(inactiveOffloadedLedgerEvictionTimeMs, TimeUnit.MILLISECONDS); + config.setLedgerOffloader(offloader); + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger_evict", config); + + // no evict when no offloaded ledgers + assertTrue(ledger.internalEvictOffloadedLedgers().isEmpty()); + + int i = 0; + for (; i < 25; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + assertEquals(ledger.getLedgersInfoAsList().size(), 3); + + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + + assertEquals(ledger.getLedgersInfoAsList().size(), 3); + assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()) + .map(e -> e.getLedgerId()).collect(Collectors.toSet()), + offloader.offloadedLedgers()); + + // ledgers should be marked as offloaded + ledger.getLedgersInfoAsList().stream().allMatch(l -> l.hasOffloadContext()); + + // no evict when no offloaded ledgers are marked as inactive + assertTrue(ledger.internalEvictOffloadedLedgers().isEmpty()); + + ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST); + int j = 0; + for (Entry e : cursor.readEntries(25)) { + assertEquals(new String(e.getData()), "entry-" + j++); + } + cursor.close(); + + // set last access time to be 2x inactiveOffloadedLedgerEvictionTimeMs + AtomicLong first = new AtomicLong(-1); + assertTrue(!ledger.ledgerCache.isEmpty()); + ledger.ledgerCache.forEach((id, l) -> { + if (first.compareAndSet(-1, id)) { + OffloadPrefixReadTest.MockOffloadReadHandle handle = + (OffloadPrefixReadTest.MockOffloadReadHandle) l.join(); + handle.setLastAccessTimestamp(System.currentTimeMillis() - inactiveOffloadedLedgerEvictionTimeMs * 2); + } + }); + assertNotEquals(first.get(), -1L); + + Awaitility.await().untilAsserted(() -> { + List<Long> evicted = ledger.internalEvictOffloadedLedgers(); + assertEquals(evicted.size(), 1); + assertEquals(first.get(), evicted.get(0).longValue()); + }); + } + +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index af5c46e328b..34a4ade9531 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -31,17 +31,18 @@ import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBuf; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import lombok.SneakyThrows; import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; @@ -57,6 +58,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; +import org.apache.bookkeeper.mledger.OffloadedLedgerHandle; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.util.MockClock; import org.apache.bookkeeper.net.BookieId; @@ -312,6 +314,10 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase { OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY); + Set<Long> offloadedLedgers() { + return offloads.values().stream().map(ReadHandle::getId).collect(Collectors.toSet()); + } + @Override public String getOffloadDriverName() { @@ -372,10 +378,11 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase { } } - static class MockOffloadReadHandle implements ReadHandle { + static class MockOffloadReadHandle implements ReadHandle, OffloadedLedgerHandle { final long id; final List<ByteBuf> entries = new ArrayList(); final LedgerMetadata metadata; + long lastAccessTimestamp = System.currentTimeMillis(); MockOffloadReadHandle(ReadHandle toCopy) throws Exception { id = toCopy.getId(); @@ -453,6 +460,15 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase { future.completeExceptionally(new UnsupportedOperationException()); return future; } + + @Override + public long lastAccessTimestamp() { + return lastAccessTimestamp; + } + + public void setLastAccessTimestamp(long lastAccessTimestamp) { + this.lastAccessTimestamp = lastAccessTimestamp; + } } static class MockMetadata implements LedgerMetadata { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 57ea9838b2d..c25d9e9aae1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -3245,6 +3245,14 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0; + @FieldContext( + dynamic = true, + category = CATEGORY_STORAGE_ML, + doc = "Time to evict inactive offloaded ledger for inactive topic. " + + "Disable eviction with value 0 (Default value 600)" + ) + private int managedLedgerInactiveOffloadedLedgerEvictionTimeSeconds = 600; + @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7b115b155a5..d1d60095b5b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2021,6 +2021,10 @@ public class BrokerService implements Closeable { managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery()); managedLedgerConfig.setInactiveLedgerRollOverTime( serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS); + managedLedgerConfig.setInactiveOffloadedLedgerEvictionTime( + serviceConfig.getManagedLedgerInactiveOffloadedLedgerEvictionTimeSeconds(), + TimeUnit.SECONDS); + managedLedgerConfig.setCacheEvictionByMarkDeletedPosition( serviceConfig.isCacheEvictionByMarkDeletedPosition()); managedLedgerConfig.setMinimumBacklogCursorsForCaching( diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index e050d74a332..1f2f901f514 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.OffloadedLedgerHandle; import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder; @@ -52,7 +53,7 @@ import org.jclouds.blobstore.domain.Blob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BlobStoreBackedReadHandleImpl implements ReadHandle { +public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedgerHandle { private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class); private final long ledgerId; @@ -70,6 +71,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { private volatile State state = null; + private volatile long lastAccessTimestamp = System.currentTimeMillis(); + private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, BackedInputStream inputStream, ExecutorService executor, OffsetsCache entryOffsetsCache) { @@ -119,7 +122,9 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { getId(), firstEntry, lastEntry, (1 + lastEntry - firstEntry)); } CompletableFuture<LedgerEntries> promise = new CompletableFuture<>(); + touch(); executor.execute(() -> { + touch(); if (state == State.Closed) { log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry); @@ -208,6 +213,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { } private void seekToEntry(long nextExpectedId) throws IOException { + touch(); Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, nextExpectedId); if (knownOffset != null) { inputStream.seek(knownOffset); @@ -312,4 +318,13 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { State getState() { return this.state; } + + @Override + public long lastAccessTimestamp() { + return lastAccessTimestamp; + } + + private void touch() { + lastAccessTimestamp = System.currentTimeMillis(); + } }
