Introduce periodic keepalive control record in writer

    * so the writer will periodically write 'keepalive' control record to make 
sure the stream is alive. so if the write proxy is disconnected from bookies, 
the control record will fail to write hence write proxy gets a chance to drop 
the ownership.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/517c77c1
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/517c77c1
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/517c77c1

Branch: refs/heads/merge/DL-98
Commit: 517c77c164cb989ae9829cbd80bf2e492eb8e364
Parents: 7b46a9a
Author: Leigh Stewart <lstew...@twitter.com>
Authored: Mon Dec 12 16:33:33 2016 -0800
Committer: Sijie Guo <sij...@twitter.com>
Committed: Mon Dec 12 16:33:33 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKLogSegmentWriter.java      | 47 ++++++++++++++++-
 .../DistributedLogConfiguration.java            | 25 +++++++++
 .../distributedlog/DistributedLogConstants.java |  1 +
 .../distributedlog/TestAsyncReaderWriter.java   | 53 ++++++++++++++++++++
 4 files changed, 125 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
index 004b2fb..1b52951 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.runtime.AbstractFunction1;
@@ -128,6 +129,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     private long numFlushesSinceRestart = 0;
     private long numBytes = 0;
     private long lastEntryId = Long.MIN_VALUE;
+    private long lastTransmitNanos = Long.MIN_VALUE;
+    private final int periodicKeepAliveMs;
 
     // Indicates whether there are writes that have been successfully 
transmitted that would need
     // a control record to be transmitted to make them visible to the readers 
by updating the last
@@ -137,7 +140,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     private int minDelayBetweenImmediateFlushMs = 0;
     private Stopwatch lastTransmit;
     private boolean streamEnded = false;
-    private ScheduledFuture<?> periodicFlushSchedule = null;
+    private final ScheduledFuture<?> periodicFlushSchedule;
+    private final ScheduledFuture<?> periodicKeepAliveSchedule;
     final private AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef = 
new AtomicReference<ScheduledFuture<?>>(null);
     final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = 
new AtomicReference<ScheduledFuture<?>>(null);
     final private AtomicReference<Exception> scheduledFlushException = new 
AtomicReference<Exception>(null);
@@ -312,11 +316,25 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             if (periodicFlushFrequency > 0 && scheduler != null) {
                 periodicFlushSchedule = scheduler.scheduleAtFixedRate(this,
                         periodicFlushFrequency/2, periodicFlushFrequency/2, 
TimeUnit.MILLISECONDS);
+            } else {
+                periodicFlushSchedule = null;
             }
         } else {
             // Min delay heuristic applies only when immediate flush is enabled
             // and transmission threshold is zero
             minDelayBetweenImmediateFlushMs = 
conf.getMinDelayBetweenImmediateFlushMs();
+            periodicFlushSchedule = null;
+        }
+        this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds();
+        if (periodicKeepAliveMs > 0 && scheduler != null) {
+            periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new 
Runnable() {
+                @Override
+                public void run() {
+                    keepAlive();
+                }
+            }, periodicKeepAliveMs, periodicKeepAliveMs, 
TimeUnit.MILLISECONDS);
+        } else {
+            periodicKeepAliveSchedule = null;
         }
 
         this.conf = conf;
@@ -513,6 +531,13 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     private void closeInternal(final boolean abort,
                                final AtomicReference<Throwable> throwExc,
                                final Promise<Void> closePromise) {
+        // Cancel the periodic keep alive schedule first
+        if (null != periodicKeepAliveSchedule) {
+            if (!periodicKeepAliveSchedule.cancel(false)) {
+                LOG.info("Periodic keepalive for log segment {} isn't 
cancelled.", getFullyQualifiedLogSegment());
+            }
+        }
+
         // Cancel the periodic flush schedule first
         // The task is allowed to exit gracefully
         if (null != periodicFlushSchedule) {
@@ -1079,6 +1104,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             }
 
             synchronized (this) {
+                // update the transmit timestamp
+                lastTransmitNanos = MathUtils.nowInNano();
+
                 BKTransmitPacket packet = new 
BKTransmitPacket(recordSetToTransmit);
                 packetPrevious = packet;
                 entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
@@ -1293,4 +1321,21 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         }
     }
 
+    synchronized private void keepAlive() {
+        if (null != closeFuture) {
+            // if the log segment is closing, skip sending any keep alive 
records.
+            LOG.debug("Skip sending keepAlive control record since log segment 
{} is closing.",
+                    getFullyQualifiedLogSegment());
+            return;
+        }
+
+        if (MathUtils.elapsedMSec(lastTransmitNanos) < periodicKeepAliveMs) {
+            return;
+        }
+
+        LogRecord controlRec = new LogRecord(lastTxId, 
DistributedLogConstants.KEEPALIVE_RECORD_CONTENT);
+        controlRec.setControl();
+        asyncWrite(controlRec);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index d2af862..c2057df 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -249,6 +249,8 @@ public class DistributedLogConfiguration extends 
CompositeConfiguration {
     public static final boolean BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT = false;
     public static final String 
BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS = 
"minimumDelayBetweenImmediateFlushMilliSeconds";
     public static final int 
BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT = 0;
+    public static final String BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS = 
"periodicKeepAliveMilliSeconds";
+    public static final int BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT = 0;
 
     // Retention/Truncation Settings
     public static final String BKDL_RETENTION_PERIOD_IN_HOURS = 
"logSegmentRetentionHours";
@@ -1893,6 +1895,29 @@ public class DistributedLogConfiguration extends 
CompositeConfiguration {
         return this;
     }
 
+    /**
+     * Get Periodic Keep Alive Frequency in milliseconds.
+     * <p>If the setting is set with a positive value, it would periodically 
write a control record
+     * to keep the stream active. The default value is 0.
+     *
+     * @return periodic keep alive frequency in milliseconds.
+     */
+    public int getPeriodicKeepAliveMilliSeconds() {
+        return this.getInt(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, 
BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT);
+    }
+
+    /**
+     * Set Periodic Keep Alive Frequency in milliseconds.
+     *
+     * @param keepAliveMs keep alive frequency in milliseconds.
+     * @return distributedlog configuration
+     * @see #getPeriodicKeepAliveMilliSeconds()
+     */
+    public DistributedLogConfiguration setPeriodicKeepAliveMilliSeconds(int 
keepAliveMs) {
+        setProperty(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, keepAliveMs);
+        return this;
+    }
+
     //
     // DL Retention/Truncation Settings
     //

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
index 5c50282..32def94 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
@@ -58,6 +58,7 @@ public class DistributedLogConstants {
     public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
     public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = 
"disallow_bookie_placement";
     static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+    public static final byte[] KEEPALIVE_RECORD_CONTENT = 
"keepalive".getBytes(UTF_8);
 
     // An ACL that gives all permissions to node creators and read permissions 
only to everyone else.
     public static final List<ACL> EVERYONE_READ_CREATOR_ALL =

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 8011a04..a4832b0 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -2115,4 +2115,57 @@ public class TestAsyncReaderWriter extends 
TestDistributedLogBase {
             }
         }
     }
+
+    @Test(timeout = 60000)
+    public void testIdleReaderExceptionWhenKeepAliveIsDisabled() throws 
Exception {
+        String name = runtime.getMethodName();
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(testConf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setPeriodicKeepAliveMilliSeconds(0);
+        confLocal.setReaderIdleWarnThresholdMillis(20);
+        confLocal.setReaderIdleErrorThresholdMillis(40);
+
+        URI uri = createDLMURI("/" + name);
+        ensureURICreated(uri);
+
+        DistributedLogManager dlm = createNewDLM(confLocal, name);
+        BKAsyncLogWriter writer = (BKAsyncLogWriter) 
FutureUtils.result(dlm.openAsyncLogWriter());
+        writer.write(DLMTestUtil.getLogRecordInstance(1L));
+
+        AsyncLogReader reader = 
FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+        try {
+            FutureUtils.result(reader.readNext());
+            fail("Should fail when stream is idle");
+        } catch (IdleReaderException ire) {
+            // expected
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testIdleReaderExceptionWhenKeepAliveIsEnabled() throws 
Exception {
+        String name = runtime.getMethodName();
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(testConf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setPeriodicKeepAliveMilliSeconds(1000);
+        confLocal.setReaderIdleWarnThresholdMillis(2000);
+        confLocal.setReaderIdleErrorThresholdMillis(4000);
+
+        URI uri = createDLMURI("/" + name);
+        ensureURICreated(uri);
+
+        DistributedLogManager dlm = createNewDLM(confLocal, name);
+        BKAsyncLogWriter writer = (BKAsyncLogWriter) 
FutureUtils.result(dlm.openAsyncLogWriter());
+        writer.write(DLMTestUtil.getLogRecordInstance(1L));
+
+        AsyncLogReader reader = 
FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+        LogRecordWithDLSN record = FutureUtils.result(reader.readNext());
+        assertEquals(1L, record.getTransactionId());
+        DLMTestUtil.verifyLogRecord(record);
+    }
 }

Reply via email to