This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new f8d2cc4cd9e [fix][offload] Fix OOM in tiered storage, caused by
unbounded offsets cache (#22679)
f8d2cc4cd9e is described below
commit f8d2cc4cd9ed847fa68859faa475b6a64717ff38
Author: Lari Hotari <[email protected]>
AuthorDate: Thu May 9 10:25:13 2024 +0300
[fix][offload] Fix OOM in tiered storage, caused by unbounded offsets cache
(#22679)
Co-authored-by: Jiwe Guo <[email protected]>
(cherry picked from commit 566330ca8d0b3419853e0252276ef42c643d3465)
---
.../bookkeeper/mledger/LedgerOffloaderFactory.java | 7 +-
.../bookkeeper/mledger/offload/Offloaders.java | 6 ++
.../jcloud/JCloudLedgerOffloaderFactory.java | 16 ++--
.../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 24 +++---
.../impl/BlobStoreManagedLedgerOffloader.java | 14 +++-
.../mledger/offload/jcloud/impl/OffsetsCache.java | 85 ++++++++++++++++++++++
.../impl/BlobStoreManagedLedgerOffloaderBase.java | 9 +++
...obStoreManagedLedgerOffloaderStreamingTest.java | 4 +-
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 6 +-
.../offload/jcloud/impl/OffsetsCacheTest.java | 45 ++++++++++++
10 files changed, 185 insertions(+), 31 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
index 7ecb8f08d57..9fbf9b73c05 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaStorage;
*/
@LimitedPrivate
@Evolving
-public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
+public interface LedgerOffloaderFactory<T extends LedgerOffloader> extends
AutoCloseable {
/**
* Check whether the provided driver <tt>driverName</tt> is supported.
@@ -111,4 +111,9 @@ public interface LedgerOffloaderFactory<T extends
LedgerOffloader> {
throws IOException {
return create(offloadPolicies, userMetadata, scheduler,
offloaderStats);
}
+
+ @Override
+ default void close() throws Exception {
+ // no-op
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
index 6910439e091..cec15599242 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
@@ -46,6 +46,12 @@ public class Offloaders implements AutoCloseable {
@Override
public void close() throws Exception {
offloaders.forEach(offloader -> {
+ try {
+ offloader.getRight().close();
+ } catch (Exception e) {
+ log.warn("Failed to close offloader '{}': {}",
+ offloader.getRight().getClass(), e.getMessage());
+ }
try {
offloader.getLeft().close();
} catch (IOException e) {
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
index 2c916567444..60363cf8406 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
@@ -25,6 +25,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable;
import
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffsetsCache;
import
org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -33,12 +34,7 @@ import
org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
* A jcloud based offloader factory.
*/
public class JCloudLedgerOffloaderFactory implements
LedgerOffloaderFactory<BlobStoreManagedLedgerOffloader> {
-
- public static JCloudLedgerOffloaderFactory of() {
- return INSTANCE;
- }
-
- private static final JCloudLedgerOffloaderFactory INSTANCE = new
JCloudLedgerOffloaderFactory();
+ private final OffsetsCache entryOffsetsCache = new OffsetsCache();
@Override
public boolean isDriverSupported(String driverName) {
@@ -58,6 +54,12 @@ public class JCloudLedgerOffloaderFactory implements
LedgerOffloaderFactory<Blob
TieredStorageConfiguration config =
TieredStorageConfiguration.create(offloadPolicies.toProperties());
- return BlobStoreManagedLedgerOffloader.create(config, userMetadata,
scheduler, offloaderStats);
+ return BlobStoreManagedLedgerOffloader.create(config, userMetadata,
scheduler, offloaderStats,
+ entryOffsetsCache);
+ }
+
+ @Override
+ public void close() throws Exception {
+ entryOffsetsCache.close();
}
}
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 4f68f90370e..e050d74a332 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -19,8 +19,6 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.IOException;
@@ -56,19 +54,13 @@ import org.slf4j.LoggerFactory;
public class BlobStoreBackedReadHandleImpl implements ReadHandle {
private static final Logger log =
LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
- private static final int CACHE_TTL_SECONDS =
-
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds",
30 * 60);
private final long ledgerId;
private final OffloadIndexBlock index;
private final BackedInputStream inputStream;
private final DataInputStream dataStream;
private final ExecutorService executor;
- // this Cache is accessed only by one thread
- private final Cache<Long, Long> entryOffsets = CacheBuilder
- .newBuilder()
- .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
- .build();
+ private final OffsetsCache entryOffsetsCache;
private final AtomicReference<CompletableFuture<Void>> closeFuture = new
AtomicReference<>();
enum State {
@@ -79,12 +71,14 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
private volatile State state = null;
private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock
index,
- BackedInputStream inputStream,
ExecutorService executor) {
+ BackedInputStream inputStream,
ExecutorService executor,
+ OffsetsCache entryOffsetsCache) {
this.ledgerId = ledgerId;
this.index = index;
this.inputStream = inputStream;
this.dataStream = new DataInputStream(inputStream);
this.executor = executor;
+ this.entryOffsetsCache = entryOffsetsCache;
state = State.Opened;
}
@@ -109,7 +103,6 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
try {
index.close();
inputStream.close();
- entryOffsets.invalidateAll();
state = State.Closed;
promise.complete(null);
} catch (IOException t) {
@@ -164,7 +157,7 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
long entryId = dataStream.readLong();
if (entryId == nextExpectedId) {
- entryOffsets.put(entryId, currentPosition);
+ entryOffsetsCache.put(ledgerId, entryId,
currentPosition);
ByteBuf buf =
PulsarByteBufAllocator.DEFAULT.buffer(length, length);
entries.add(LedgerEntryImpl.create(ledgerId, entryId,
length, buf));
int toWrite = length;
@@ -215,7 +208,7 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
}
private void seekToEntry(long nextExpectedId) throws IOException {
- Long knownOffset = entryOffsets.getIfPresent(nextExpectedId);
+ Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId,
nextExpectedId);
if (knownOffset != null) {
inputStream.seek(knownOffset);
} else {
@@ -269,7 +262,8 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
BlobStore blobStore, String bucket, String
key, String indexKey,
VersionCheck versionCheck,
long ledgerId, int readBufferSize,
- LedgerOffloaderStats offloaderStats, String
managedLedgerName)
+ LedgerOffloaderStats offloaderStats, String
managedLedgerName,
+ OffsetsCache entryOffsetsCache)
throws IOException, BKException.BKNoSuchLedgerExistsException {
int retryCount = 3;
OffloadIndexBlock index = null;
@@ -310,7 +304,7 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
BackedInputStream inputStream = new
BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
versionCheck, index.getDataObjectLength(), readBufferSize,
offloaderStats, managedLedgerName);
- return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream,
executor);
+ return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream,
executor, entryOffsetsCache);
}
// for testing
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 1b6062ffa03..9f89bd52a86 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -108,6 +108,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
private AtomicLong bufferLength = new AtomicLong(0);
private AtomicLong segmentLength = new AtomicLong(0);
private final long maxBufferLength;
+ private final OffsetsCache entryOffsetsCache;
private final ConcurrentLinkedQueue<Entry> offloadBuffer = new
ConcurrentLinkedQueue<>();
private CompletableFuture<OffloadResult> offloadResult;
private volatile PositionImpl lastOfferedPosition = PositionImpl.LATEST;
@@ -123,13 +124,16 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
public static BlobStoreManagedLedgerOffloader
create(TieredStorageConfiguration config,
Map<String, String>
userMetadata,
OrderedScheduler
scheduler,
- LedgerOffloaderStats
offloaderStats) throws IOException {
+ LedgerOffloaderStats
offloaderStats,
+ OffsetsCache
entryOffsetsCache)
+ throws IOException {
- return new BlobStoreManagedLedgerOffloader(config, scheduler,
userMetadata, offloaderStats);
+ return new BlobStoreManagedLedgerOffloader(config, scheduler,
userMetadata, offloaderStats, entryOffsetsCache);
}
BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config,
OrderedScheduler scheduler,
- Map<String, String> userMetadata,
LedgerOffloaderStats offloaderStats) {
+ Map<String, String> userMetadata,
LedgerOffloaderStats offloaderStats,
+ OffsetsCache entryOffsetsCache) {
this.scheduler = scheduler;
this.userMetadata = userMetadata;
@@ -140,6 +144,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
this.minSegmentCloseTimeMillis =
Duration.ofSeconds(config.getMinSegmentTimeInSecond()).toMillis();
//ensure buffer can have enough content to fill a block
this.maxBufferLength = Math.max(config.getWriteBufferSizeInBytes(),
config.getMinBlockSizeInBytes());
+ this.entryOffsetsCache = entryOffsetsCache;
this.segmentBeginTimeMillis = System.currentTimeMillis();
if (!Strings.isNullOrEmpty(config.getRegion())) {
this.writeLocation = new LocationBuilder()
@@ -555,7 +560,8 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
readBucket, key, indexKey,
DataBlockUtils.VERSION_CHECK,
ledgerId, config.getReadBufferSizeInBytes(),
- this.offloaderStats,
offloadDriverMetadata.get(MANAGED_LEDGER_NAME)));
+ this.offloaderStats,
offloadDriverMetadata.get(MANAGED_LEDGER_NAME),
+ this.entryOffsetsCache));
} catch (Throwable t) {
log.error("Failed readOffloaded: ", t);
promise.completeExceptionally(t);
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCache.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCache.java
new file mode 100644
index 00000000000..fa13afa8ff0
--- /dev/null
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCache.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class OffsetsCache implements AutoCloseable {
+ private static final int CACHE_TTL_SECONDS =
+
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 5
* 60);
+ // limit the cache size to avoid OOM
+ // 1 million entries consumes about 60MB of heap space
+ private static final int CACHE_MAX_SIZE =
+
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.max.size",
1_000_000);
+ private final ScheduledExecutorService cacheEvictionExecutor;
+
+ record Key(long ledgerId, long entryId) {
+
+ }
+
+ private final Cache<OffsetsCache.Key, Long> entryOffsetsCache;
+
+ public OffsetsCache() {
+ if (CACHE_MAX_SIZE > 0) {
+ entryOffsetsCache = CacheBuilder
+ .newBuilder()
+ .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
+ .maximumSize(CACHE_MAX_SIZE)
+ .build();
+ cacheEvictionExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new
DefaultThreadFactory("jcloud-offsets-cache-eviction"));
+ int period = Math.max(CACHE_TTL_SECONDS / 2, 1);
+ cacheEvictionExecutor.scheduleAtFixedRate(() -> {
+ entryOffsetsCache.cleanUp();
+ }, period, period, TimeUnit.SECONDS);
+ } else {
+ cacheEvictionExecutor = null;
+ entryOffsetsCache = null;
+ }
+ }
+
+ public void put(long ledgerId, long entryId, long currentPosition) {
+ if (entryOffsetsCache != null) {
+ entryOffsetsCache.put(new Key(ledgerId, entryId), currentPosition);
+ }
+ }
+
+ public Long getIfPresent(long ledgerId, long entryId) {
+ return entryOffsetsCache != null ? entryOffsetsCache.getIfPresent(new
Key(ledgerId, entryId)) : null;
+ }
+
+ public void clear() {
+ if (entryOffsetsCache != null) {
+ entryOffsetsCache.invalidateAll();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (cacheEvictionExecutor != null) {
+ cacheEvictionExecutor.shutdownNow();
+ }
+ }
+}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
index 89d9021d36d..75faf098b40 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
@@ -33,6 +33,7 @@ import org.apache.commons.lang3.StringUtils;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.domain.Credentials;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
public abstract class BlobStoreManagedLedgerOffloaderBase {
@@ -46,6 +47,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
protected final JCloudBlobStoreProvider provider;
protected TieredStorageConfiguration config;
protected BlobStore blobStore = null;
+ protected final OffsetsCache entryOffsetsCache = new OffsetsCache();
protected BlobStoreManagedLedgerOffloaderBase() throws Exception {
scheduler =
OrderedScheduler.newSchedulerBuilder().numThreads(5).name("offloader").build();
@@ -56,6 +58,13 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
@AfterMethod(alwaysRun = true)
public void cleanupMockBookKeeper() {
bk.getLedgerMap().clear();
+ entryOffsetsCache.clear();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void cleanup() throws Exception {
+ entryOffsetsCache.close();
+ scheduler.shutdownNow();
}
protected static MockManagedLedger createMockManagedLedger() {
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
index ad1529072f8..e706e4254cb 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
@@ -82,7 +82,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest
extends BlobStoreManag
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket, additionalConfig)));
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use
the REAL blobStore
BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader
- .create(mockedConfig, new HashMap<String, String>(),
scheduler, this.offloaderStats);
+ .create(mockedConfig, new HashMap<String, String>(),
scheduler, this.offloaderStats, entryOffsetsCache);
return offloader;
}
@@ -91,7 +91,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest
extends BlobStoreManag
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket, additionalConfig)));
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader
- .create(mockedConfig, new HashMap<String, String>(),
scheduler, this.offloaderStats);
+ .create(mockedConfig, new HashMap<String, String>(),
scheduler, this.offloaderStats, entryOffsetsCache);
return offloader;
}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 4419210c251..bf6ede896ab 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -98,14 +98,16 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
private BlobStoreManagedLedgerOffloader getOffloader(String bucket) throws
IOException {
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket)));
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use
the REAL blobStore
- BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig, new
HashMap<String,String>(), scheduler, this.offloaderStats);
+ BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig, new
HashMap<String,String>(), scheduler, this.offloaderStats,
+ entryOffsetsCache);
return offloader;
}
private BlobStoreManagedLedgerOffloader getOffloader(String bucket,
BlobStore mockedBlobStore) throws IOException {
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket)));
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
- BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig, new
HashMap<String,String>(), scheduler, this.offloaderStats);
+ BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig, new
HashMap<String,String>(), scheduler, this.offloaderStats,
+ entryOffsetsCache);
return offloader;
}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java
new file mode 100644
index 00000000000..86a72c7b554
--- /dev/null
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+@Slf4j
+public class OffsetsCacheTest {
+
+ @Test
+ public void testCache() throws Exception {
+
System.setProperty("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds",
"1");
+ OffsetsCache offsetsCache = new OffsetsCache();
+ assertNull(offsetsCache.getIfPresent(1, 2));
+ offsetsCache.put(1, 1, 1);
+ assertEquals(offsetsCache.getIfPresent(1, 1), 1);
+ offsetsCache.clear();
+ assertNull(offsetsCache.getIfPresent(1, 1));
+ // test ttl
+ offsetsCache.put(1, 2, 2);
+ assertEquals(offsetsCache.getIfPresent(1, 2), 2);
+ Thread.sleep(1500);
+ assertNull(offsetsCache.getIfPresent(1, 2));
+ offsetsCache.close();
+ }
+}