This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a8cd908 Perform periodic flush of ManagedCursor mark-delete posistions (#8634) a8cd908 is described below commit a8cd908b45c2741a00411c02afc116a435e30ef5 Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Nov 19 16:30:14 2020 -0800 Perform periodic flush of ManagedCursor mark-delete posistions (#8634) --- conf/broker.conf | 4 ++ conf/standalone.conf | 4 ++ .../mledger/ManagedLedgerFactoryConfig.java | 5 ++ .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 +++++++++ .../mledger/impl/ManagedLedgerFactoryImpl.java | 15 ++++++ .../bookkeeper/mledger/impl/ManagedCursorTest.java | 53 +++++++++++++++++++++- .../apache/pulsar/broker/ServiceConfiguration.java | 5 ++ .../pulsar/broker/ManagedLedgerClientFactory.java | 1 + 8 files changed, 107 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 5fe9e8d..416b9e4 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -788,6 +788,10 @@ managedLedgerDefaultWriteQuorum=2 # Number of guaranteed copies (acks to wait before write is complete) managedLedgerDefaultAckQuorum=2 +# How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). +# Default is 60 seconds +managedLedgerCursorPositionFlushSeconds = 60 + # Default type of checksum to use when writing to BookKeeper. Default is "CRC32C" # Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum). managedLedgerDigestType=CRC32C diff --git a/conf/standalone.conf b/conf/standalone.conf index 4cea72d..9deb862 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -536,6 +536,10 @@ managedLedgerDefaultWriteQuorum=1 # Number of guaranteed copies (acks to wait before write is complete) managedLedgerDefaultAckQuorum=1 +# How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). +# Default is 60 seconds +managedLedgerCursorPositionFlushSeconds = 60 + # Default type of checksum to use when writing to BookKeeper. Default is "CRC32C" # Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum). managedLedgerDigestType=CRC32C diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index f1223e5..d2109ea 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -67,6 +67,11 @@ public class ManagedLedgerFactoryConfig { private int prometheusStatsLatencyRolloverSeconds = 60; /** + * How frequently to flush the cursor positions that were accumulated due to rate limiting. + */ + private int cursorPositionFlushSeconds = 60; + + /** * cluster name for prometheus stats */ private String clusterName; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 5b59986..1b7d22e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -169,6 +169,9 @@ public class ManagedCursorImpl implements ManagedCursor { private final ReadWriteLock lock = new ReentrantReadWriteLock(); private RateLimiter markDeleteLimiter; + // The cursor is considered "dirty" when there are mark-delete updates that are only applied in memory, + // because of the rate limiting. + private volatile boolean isDirty = false; private boolean alwaysInactive = false; @@ -1626,6 +1629,7 @@ public class ManagedCursorImpl implements ManagedCursor { // Apply rate limiting to mark-delete operations if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { + isDirty = true; lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, properties, null, null); callback.markDeleteComplete(ctx); return; @@ -2883,6 +2887,24 @@ public class ManagedCursorImpl implements ManagedCursor { this.entriesReadSize += readEntriesSize; } + void flush() { + if (!isDirty) { + return; + } + + isDirty = false; + asyncMarkDelete(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}][{}] Failed to flush mark-delete position", ledger.getName(), name, exception); + } + }, null); + } + private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { if (maxSizeBytes == NO_MAX_SIZE_LIMIT) { return maxEntries; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index a2c668e..f411eff 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -108,6 +108,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private long lastStatTimestamp = System.nanoTime(); private final ScheduledFuture<?> statsTask; + private final ScheduledFuture<?> flushCursorsTask; private final long cacheEvictionTimeThresholdNanos; private final MetadataStore metadataStore; @@ -202,6 +203,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { this.mbean = new ManagedLedgerFactoryMBeanImpl(this); this.entryCacheManager = new EntryCacheManager(this); this.statsTask = scheduledExecutor.scheduleAtFixedRate(this::refreshStats, 0, StatsPeriodSeconds, TimeUnit.SECONDS); + this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(this::flushCursors, + config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS); this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS @@ -230,6 +233,17 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { } } + private synchronized void flushCursors() { + ledgers.values().forEach(mlfuture -> { + if (mlfuture.isDone() && !mlfuture.isCompletedExceptionally()) { + ManagedLedgerImpl ml = mlfuture.getNow(null); + if (ml != null) { + ml.getCursors().forEach(c -> ((ManagedCursorImpl) c).flush()); + } + } + }); + } + private synchronized void refreshStats() { long now = System.nanoTime(); long period = now - lastStatTimestamp; @@ -483,6 +497,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { @Override public void shutdown() throws InterruptedException, ManagedLedgerException { statsTask.cancel(true); + flushCursorsTask.cancel(true); int numLedgers = ledgers.size(); final CountDownLatch latch = new CountDownLatch(numLedgers); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index e53bcae..ffee4f4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -3190,8 +3190,9 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { } return result; } - - void testReadEntriesOrWaitWithMaxSize() throws Exception { + + @Test + public void testReadEntriesOrWaitWithMaxSize() throws Exception { ManagedLedger ledger = factory.open("testReadEntriesOrWaitWithMaxSize"); ManagedCursor c = ledger.openCursor("c"); @@ -3215,5 +3216,53 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { entries.forEach(e -> e.release()); } + @Test + public void testFlushCursorAfterInactivity() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setThrottleMarkDelete(1.0); + + ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig(); + factoryConfig.setCursorPositionFlushSeconds(1); + ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), factoryConfig); + ManagedLedger ledger1 = factory1.open("testFlushCursorAfterInactivity", config); + ManagedCursor c1 = ledger1.openCursor("c"); + List<Position> positions = new ArrayList<Position>(); + + for (int i = 0; i < 20; i++) { + positions.add(ledger1.addEntry(new byte[1024])); + } + + CountDownLatch latch = new CountDownLatch(positions.size()); + + positions.forEach(p -> c1.asyncMarkDelete(p, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + throw new RuntimeException(exception); + } + }, null)); + + latch.await(); + + assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + + // Give chance to the flush to be automatically triggered. + Thread.sleep(3000); + + // Abruptly re-open the managed ledger without graceful close + ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config); + ManagedCursor c2 = ledger2.openCursor("c"); + + assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + + factory1.shutdown(); + factory2.shutdown(); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 6e55f6f..81e1fa4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1215,6 +1215,11 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int managedLedgerDefaultAckQuorum = 2; + @FieldContext(minValue = 1, + category = CATEGORY_STORAGE_ML, + doc = "How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). Default is 60 seconds") + private int managedLedgerCursorPositionFlushSeconds = 60; + // // @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index a48224c..5f6cdaf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -64,6 +64,7 @@ public class ManagedLedgerClientFactory implements Closeable { managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries()); managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds()); managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution()); + managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds()); Configuration configuration = new ClientConfiguration(); if (conf.isBookkeeperClientExposeStatsToPrometheus()) {