This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a8cd908  Perform periodic flush of ManagedCursor mark-delete 
posistions (#8634)
a8cd908 is described below

commit a8cd908b45c2741a00411c02afc116a435e30ef5
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Thu Nov 19 16:30:14 2020 -0800

    Perform periodic flush of ManagedCursor mark-delete posistions (#8634)
---
 conf/broker.conf                                   |  4 ++
 conf/standalone.conf                               |  4 ++
 .../mledger/ManagedLedgerFactoryConfig.java        |  5 ++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 +++++++++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 15 ++++++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 53 +++++++++++++++++++++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  5 ++
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  1 +
 8 files changed, 107 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 5fe9e8d..416b9e4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -788,6 +788,10 @@ managedLedgerDefaultWriteQuorum=2
 # Number of guaranteed copies (acks to wait before write is complete)
 managedLedgerDefaultAckQuorum=2
 
+# How frequently to flush the cursor positions that were accumulated due to 
rate limiting. (seconds).
+# Default is 60 seconds
+managedLedgerCursorPositionFlushSeconds = 60
+
 # Default type of checksum to use when writing to BookKeeper. Default is 
"CRC32C"
 # Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
 managedLedgerDigestType=CRC32C
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 4cea72d..9deb862 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -536,6 +536,10 @@ managedLedgerDefaultWriteQuorum=1
 # Number of guaranteed copies (acks to wait before write is complete)
 managedLedgerDefaultAckQuorum=1
 
+# How frequently to flush the cursor positions that were accumulated due to 
rate limiting. (seconds).
+# Default is 60 seconds
+managedLedgerCursorPositionFlushSeconds = 60
+
 # Default type of checksum to use when writing to BookKeeper. Default is 
"CRC32C"
 # Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
 managedLedgerDigestType=CRC32C
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index f1223e5..d2109ea 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -67,6 +67,11 @@ public class ManagedLedgerFactoryConfig {
     private int prometheusStatsLatencyRolloverSeconds = 60;
 
     /**
+     * How frequently to flush the cursor positions that were accumulated due 
to rate limiting.
+     */
+    private int cursorPositionFlushSeconds = 60;
+
+    /**
      * cluster name for prometheus stats
      */
     private String clusterName;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 5b59986..1b7d22e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -169,6 +169,9 @@ public class ManagedCursorImpl implements ManagedCursor {
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     private RateLimiter markDeleteLimiter;
+    // The cursor is considered "dirty" when there are mark-delete updates 
that are only applied in memory,
+    // because of the rate limiting.
+    private volatile boolean isDirty = false;
 
     private boolean alwaysInactive = false;
 
@@ -1626,6 +1629,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         // Apply rate limiting to mark-delete operations
         if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
+            isDirty = true;
             lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, properties, 
null, null);
             callback.markDeleteComplete(ctx);
             return;
@@ -2883,6 +2887,24 @@ public class ManagedCursorImpl implements ManagedCursor {
         this.entriesReadSize += readEntriesSize;
     }
 
+    void flush() {
+        if (!isDirty) {
+            return;
+        }
+
+        isDirty = false;
+        asyncMarkDelete(lastMarkDeleteEntry.newPosition, 
lastMarkDeleteEntry.properties, new MarkDeleteCallback() {
+            @Override
+            public void markDeleteComplete(Object ctx) {
+            }
+
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                log.warn("[{}][{}] Failed to flush mark-delete position", 
ledger.getName(), name, exception);
+            }
+        }, null);
+    }
+
     private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
         if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
             return maxEntries;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index a2c668e..f411eff 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -108,6 +108,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 
     private long lastStatTimestamp = System.nanoTime();
     private final ScheduledFuture<?> statsTask;
+    private final ScheduledFuture<?> flushCursorsTask;
 
     private final long cacheEvictionTimeThresholdNanos;
     private final MetadataStore metadataStore;
@@ -202,6 +203,8 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
         this.entryCacheManager = new EntryCacheManager(this);
         this.statsTask = 
scheduledExecutor.scheduleAtFixedRate(this::refreshStats, 0, 
StatsPeriodSeconds, TimeUnit.SECONDS);
+        this.flushCursorsTask = 
scheduledExecutor.scheduleAtFixedRate(this::flushCursors,
+                config.getCursorPositionFlushSeconds(), 
config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);
 
 
         this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS
@@ -230,6 +233,17 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         }
     }
 
+    private synchronized void flushCursors() {
+        ledgers.values().forEach(mlfuture -> {
+            if (mlfuture.isDone() && !mlfuture.isCompletedExceptionally()) {
+                ManagedLedgerImpl ml = mlfuture.getNow(null);
+                if (ml != null) {
+                    ml.getCursors().forEach(c -> ((ManagedCursorImpl) 
c).flush());
+                }
+            }
+        });
+    }
+
     private synchronized void refreshStats() {
         long now = System.nanoTime();
         long period = now - lastStatTimestamp;
@@ -483,6 +497,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     @Override
     public void shutdown() throws InterruptedException, ManagedLedgerException 
{
         statsTask.cancel(true);
+        flushCursorsTask.cancel(true);
 
         int numLedgers = ledgers.size();
         final CountDownLatch latch = new CountDownLatch(numLedgers);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index e53bcae..ffee4f4 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3190,8 +3190,9 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         }
         return result;
     }
-  
-    void testReadEntriesOrWaitWithMaxSize() throws Exception {
+
+    @Test
+    public void testReadEntriesOrWaitWithMaxSize() throws Exception {
         ManagedLedger ledger = 
factory.open("testReadEntriesOrWaitWithMaxSize");
         ManagedCursor c = ledger.openCursor("c");
 
@@ -3215,5 +3216,53 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         entries.forEach(e -> e.release());
     }
 
+    @Test
+    public void testFlushCursorAfterInactivity() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setThrottleMarkDelete(1.0);
+
+        ManagedLedgerFactoryConfig factoryConfig = new 
ManagedLedgerFactoryConfig();
+        factoryConfig.setCursorPositionFlushSeconds(1);
+        ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(bkc, 
bkc.getZkHandle(), factoryConfig);
+        ManagedLedger ledger1 = 
factory1.open("testFlushCursorAfterInactivity", config);
+        ManagedCursor c1 = ledger1.openCursor("c");
+        List<Position> positions = new ArrayList<Position>();
+
+        for (int i = 0; i < 20; i++) {
+            positions.add(ledger1.addEntry(new byte[1024]));
+        }
+
+        CountDownLatch latch = new CountDownLatch(positions.size());
+
+        positions.forEach(p -> c1.asyncMarkDelete(p, new MarkDeleteCallback() {
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                latch.countDown();
+            }
+
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                throw new RuntimeException(exception);
+            }
+        }, null));
+
+        latch.await();
+
+        assertEquals(c1.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+
+        // Give chance to the flush to be automatically triggered.
+        Thread.sleep(3000);
+
+        // Abruptly re-open the managed ledger without graceful close
+        ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, 
bkc.getZkHandle());
+        ManagedLedger ledger2 = 
factory2.open("testFlushCursorAfterInactivity", config);
+        ManagedCursor c2 = ledger2.openCursor("c");
+
+        assertEquals(c2.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+
+        factory1.shutdown();
+        factory2.shutdown();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6e55f6f..81e1fa4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1215,6 +1215,11 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int managedLedgerDefaultAckQuorum = 2;
 
+    @FieldContext(minValue = 1,
+            category = CATEGORY_STORAGE_ML,
+            doc = "How frequently to flush the cursor positions that were 
accumulated due to rate limiting. (seconds). Default is 60 seconds")
+    private int managedLedgerCursorPositionFlushSeconds = 60;
+
     //
     //
     @FieldContext(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index a48224c..5f6cdaf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -64,6 +64,7 @@ public class ManagedLedgerClientFactory implements Closeable {
         
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
         
managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
         
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
+        
managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
 
         Configuration configuration = new ClientConfiguration();
         if (conf.isBookkeeperClientExposeStatsToPrometheus()) {

Reply via email to