This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 7f12634fd7b Add a cache eviction policy:Evicting cache data by the
slowest markDeletedPosition (#14985)
7f12634fd7b is described below
commit 7f12634fd7b1bf6e4bd9876f8c51490a6780ac36
Author: LinChen <[email protected]>
AuthorDate: Wed Apr 6 15:53:58 2022 +0800
Add a cache eviction policy:Evicting cache data by the slowest
markDeletedPosition (#14985)
(cherry picked from commit 9b36dcd777b7e3b159b57cd409784a3622483132)
---
.../bookkeeper/mledger/ManagedLedgerConfig.java | 5 +
.../mledger/impl/ManagedCursorContainer.java | 4 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 25 ++-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 198 +++++++++++++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 9 +-
.../pulsar/broker/service/BrokerService.java | 2 +
6 files changed, 238 insertions(+), 5 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 2f4e098d677..d7d3b1c02cd 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -26,6 +26,8 @@ import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.api.DigestType;
@@ -77,6 +79,9 @@ public class ManagedLedgerConfig {
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
private ManagedLedgerInterceptor managedLedgerInterceptor;
+ @Getter
+ @Setter
+ private boolean cacheEvictionByMarkDeletedPosition = false;
public boolean isCreateIfMissing() {
return createIfMissing;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index c631cdcf96d..4c3d4134379 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -108,6 +108,10 @@ public class ManagedCursorContainer implements
Iterable<ManagedCursor> {
return heap.isEmpty() ? null : (PositionImpl)
heap.get(0).cursor.getReadPosition();
}
+ public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
+ return heap.isEmpty() ? null : (PositionImpl)
heap.get(0).cursor.getMarkDeletedPosition();
+ }
+
public ManagedCursor get(String name) {
long stamp = rwLock.readLock();
try {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 89e94934ded..cd47cf31ba3 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2127,10 +2127,15 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
if (entryCache.getSize() <= 0) {
return;
}
- // Always remove all entries already read by active cursors
- PositionImpl slowestReaderPos =
getEarlierReadPositionForActiveCursors();
- if (slowestReaderPos != null) {
- entryCache.invalidateEntries(slowestReaderPos);
+ PositionImpl evictionPos;
+ if (config.isCacheEvictionByMarkDeletedPosition()) {
+ evictionPos =
getEarlierMarkDeletedPositionForActiveCursors().getNext();
+ } else {
+ // Always remove all entries already read by active cursors
+ evictionPos = getEarlierReadPositionForActiveCursors();
+ }
+ if (evictionPos != null) {
+ entryCache.invalidateEntries(evictionPos);
}
// Remove entries older than the cutoff threshold
@@ -2149,6 +2154,18 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return durablePosition.compareTo(nonDurablePosition) > 0 ?
nonDurablePosition : durablePosition;
}
+ private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() {
+ PositionImpl nonDurablePosition =
nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors();
+ PositionImpl durablePosition =
activeCursors.getSlowestMarkDeletedPositionForActiveCursors();
+ if (nonDurablePosition == null) {
+ return durablePosition;
+ }
+ if (durablePosition == null) {
+ return nonDurablePosition;
+ }
+ return durablePosition.compareTo(nonDurablePosition) > 0 ?
nonDurablePosition : durablePosition;
+ }
+
void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor,
newPosition);
if (pair == null) {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 1e499aeb31b..348ee81d4e2 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -293,6 +293,204 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setCacheEvictionByMarkDeletedPosition(true);
+ factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+ .toNanos(30000));
+ factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
+ @Override
+ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback()
{
+ @Override
+ public void openCursorComplete(ManagedCursor cursor,
Object ctx) {
+ ManagedLedger ledger = (ManagedLedger) ctx;
+ String message1 = "test";
+ ledger.asyncAddEntry(message1.getBytes(Encoding), new
AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf
entryData, Object ctx) {
+ @SuppressWarnings("unchecked")
+ Pair<ManagedLedger, ManagedCursor> pair =
(Pair<ManagedLedger, ManagedCursor>) ctx;
+ ManagedLedger ledger = pair.getLeft();
+ ManagedCursor cursor = pair.getRight();
+ if (((ManagedLedgerImpl)
ledger).getCacheSize() != message1.getBytes(Encoding).length) {
+ result.complete(false);
+ return;
+ }
+ cursor.asyncReadEntries(1, new
ReadEntriesCallback() {
+ @Override
+ public void
readEntriesComplete(List<Entry> entries, Object ctx) {
+ ManagedCursor cursor = (ManagedCursor)
ctx;
+ assertEquals(entries.size(), 1);
+ Entry entry = entries.get(0);
+ final Position position =
entry.getPosition();
+ if (!message1.equals(new
String(entry.getDataAndRelease(), Encoding))) {
+ result.complete(false);
+ return;
+ }
+ ((ManagedLedgerImpl)
ledger).doCacheEviction(
+ System.nanoTime() -
TimeUnit.MILLISECONDS.toNanos(30000));
+ if (((ManagedLedgerImpl)
ledger).getCacheSize() != message1.getBytes(Encoding).length) {
+ result.complete(false);
+ return;
+ }
+
+ log.debug("Mark-Deleting to position
{}", position);
+ cursor.asyncMarkDelete(position, new
MarkDeleteCallback() {
+ @Override
+ public void
markDeleteComplete(Object ctx) {
+ log.debug("Mark delete
complete");
+ ManagedCursor cursor =
(ManagedCursor) ctx;
+ if (cursor.hasMoreEntries()) {
+ result.complete(false);
+ return;
+ }
+ ((ManagedLedgerImpl)
ledger).doCacheEviction(
+ System.nanoTime() -
TimeUnit.MILLISECONDS.toNanos(30000));
+
result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
+ }
+
+ @Override
+ public void
markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+
result.completeExceptionally(exception);
+ }
+
+ }, cursor);
+ }
+
+ @Override
+ public void
readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+
result.completeExceptionally(exception);
+ }
+ }, cursor, PositionImpl.latest);
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException
exception, Object ctx) {
+ result.completeExceptionally(exception);
+ }
+ }, Pair.of(ledger, cursor));
+ }
+
+ @Override
+ public void openCursorFailed(ManagedLedgerException
exception, Object ctx) {
+ result.completeExceptionally(exception);
+ }
+
+ }, ledger);
+ }
+
+ @Override
+ public void openLedgerFailed(ManagedLedgerException exception,
Object ctx) {
+ result.completeExceptionally(exception);
+ }
+ }, null, null);
+
+ assertTrue(result.get());
+
+ log.info("Test completed");
+ }
+
+ @Test
+ public void testCacheEvictionByReadPosition() throws Throwable {
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+ .toNanos(30000));
+ factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
+ @Override
+ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback()
{
+ @Override
+ public void openCursorComplete(ManagedCursor cursor,
Object ctx) {
+ ManagedLedger ledger = (ManagedLedger) ctx;
+ String message1 = "test";
+ ledger.asyncAddEntry(message1.getBytes(Encoding), new
AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf
entryData, Object ctx) {
+ @SuppressWarnings("unchecked")
+ Pair<ManagedLedger, ManagedCursor> pair =
(Pair<ManagedLedger, ManagedCursor>) ctx;
+ ManagedLedger ledger = pair.getLeft();
+ ManagedCursor cursor = pair.getRight();
+ if (((ManagedLedgerImpl)
ledger).getCacheSize() != message1.getBytes(Encoding).length) {
+ result.complete(false);
+ return;
+ }
+
+ cursor.asyncReadEntries(1, new
ReadEntriesCallback() {
+ @Override
+ public void
readEntriesComplete(List<Entry> entries, Object ctx) {
+ ManagedCursor cursor = (ManagedCursor)
ctx;
+ assertEquals(entries.size(), 1);
+ Entry entry = entries.get(0);
+ final Position position =
entry.getPosition();
+ if (!message1.equals(new
String(entry.getDataAndRelease(), Encoding))) {
+ result.complete(false);
+ return;
+ }
+ ((ManagedLedgerImpl)
ledger).doCacheEviction(
+ System.nanoTime() -
TimeUnit.MILLISECONDS.toNanos(30000));
+ if (((ManagedLedgerImpl)
ledger).getCacheSize() != 0) {
+ result.complete(false);
+ return;
+ }
+
+ log.debug("Mark-Deleting to position
{}", position);
+ cursor.asyncMarkDelete(position, new
MarkDeleteCallback() {
+ @Override
+ public void
markDeleteComplete(Object ctx) {
+ log.debug("Mark delete
complete");
+ ManagedCursor cursor =
(ManagedCursor) ctx;
+ if (cursor.hasMoreEntries()) {
+ result.complete(false);
+ return;
+ }
+
result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
+ }
+
+ @Override
+ public void
markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+
result.completeExceptionally(exception);
+ }
+
+ }, cursor);
+ }
+
+ @Override
+ public void
readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+
result.completeExceptionally(exception);
+ }
+ }, cursor, PositionImpl.latest);
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException
exception, Object ctx) {
+ result.completeExceptionally(exception);
+ }
+ }, Pair.of(ledger, cursor));
+ }
+
+ @Override
+ public void openCursorFailed(ManagedLedgerException
exception, Object ctx) {
+ result.completeExceptionally(exception);
+ }
+
+ }, ledger);
+ }
+
+ @Override
+ public void openLedgerFailed(ManagedLedgerException exception,
Object ctx) {
+ result.completeExceptionally(exception);
+ }
+ }, null, null);
+
+ assertTrue(result.get());
+
+ log.info("Test completed");
+ }
+
@Test(timeOut = 20000)
public void asyncAPI() throws Throwable {
final CountDownLatch counter = new CountDownLatch(1);
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ee7ed9e5d7c..65dad25cf1a 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2234,7 +2234,14 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int managedLedgerOffloadPrefetchRounds = 1;
- /**** --- Transaction config variables --- ****/
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Evicting cache data by the slowest markDeletedPosition or
readPosition. "
+ + "The default is to evict through readPosition."
+ )
+ private boolean cacheEvictionByMarkDeletedPosition = false;
+
+ /**** --- Transaction config variables. --- ****/
@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Enable transaction coordinator in broker"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 39eeed7ae02..b251b0d843b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1530,6 +1530,8 @@ public class BrokerService implements Closeable {
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+ managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
+
serviceConfig.isCacheEvictionByMarkDeletedPosition());
OffloadPoliciesImpl nsLevelOffloadPolicies =
(OffloadPoliciesImpl) policies.map(p ->
p.offload_policies).orElse(null);