Introduce periodic keepalive control record in writer * so the writer will periodically write 'keepalive' control record to make sure the stream is alive. so if the write proxy is disconnected from bookies, the control record will fail to write hence write proxy gets a chance to drop the ownership.
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/517c77c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/517c77c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/517c77c1 Branch: refs/heads/merge/DL-98 Commit: 517c77c164cb989ae9829cbd80bf2e492eb8e364 Parents: 7b46a9a Author: Leigh Stewart <lstew...@twitter.com> Authored: Mon Dec 12 16:33:33 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Mon Dec 12 16:33:33 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKLogSegmentWriter.java | 47 ++++++++++++++++- .../DistributedLogConfiguration.java | 25 +++++++++ .../distributedlog/DistributedLogConstants.java | 1 + .../distributedlog/TestAsyncReaderWriter.java | 53 ++++++++++++++++++++ 4 files changed, 125 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java index 004b2fb..1b52951 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java @@ -72,6 +72,7 @@ import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.runtime.AbstractFunction1; @@ -128,6 +129,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private long numFlushesSinceRestart = 0; private long numBytes = 0; private long lastEntryId = Long.MIN_VALUE; + private long lastTransmitNanos = Long.MIN_VALUE; + private final int periodicKeepAliveMs; // Indicates whether there are writes that have been successfully transmitted that would need // a control record to be transmitted to make them visible to the readers by updating the last @@ -137,7 +140,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private int minDelayBetweenImmediateFlushMs = 0; private Stopwatch lastTransmit; private boolean streamEnded = false; - private ScheduledFuture<?> periodicFlushSchedule = null; + private final ScheduledFuture<?> periodicFlushSchedule; + private final ScheduledFuture<?> periodicKeepAliveSchedule; final private AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null); final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null); final private AtomicReference<Exception> scheduledFlushException = new AtomicReference<Exception>(null); @@ -312,11 +316,25 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz if (periodicFlushFrequency > 0 && scheduler != null) { periodicFlushSchedule = scheduler.scheduleAtFixedRate(this, periodicFlushFrequency/2, periodicFlushFrequency/2, TimeUnit.MILLISECONDS); + } else { + periodicFlushSchedule = null; } } else { // Min delay heuristic applies only when immediate flush is enabled // and transmission threshold is zero minDelayBetweenImmediateFlushMs = conf.getMinDelayBetweenImmediateFlushMs(); + periodicFlushSchedule = null; + } + this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds(); + if (periodicKeepAliveMs > 0 && scheduler != null) { + periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + keepAlive(); + } + }, periodicKeepAliveMs, periodicKeepAliveMs, TimeUnit.MILLISECONDS); + } else { + periodicKeepAliveSchedule = null; } this.conf = conf; @@ -513,6 +531,13 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private void closeInternal(final boolean abort, final AtomicReference<Throwable> throwExc, final Promise<Void> closePromise) { + // Cancel the periodic keep alive schedule first + if (null != periodicKeepAliveSchedule) { + if (!periodicKeepAliveSchedule.cancel(false)) { + LOG.info("Periodic keepalive for log segment {} isn't cancelled.", getFullyQualifiedLogSegment()); + } + } + // Cancel the periodic flush schedule first // The task is allowed to exit gracefully if (null != periodicFlushSchedule) { @@ -1079,6 +1104,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } synchronized (this) { + // update the transmit timestamp + lastTransmitNanos = MathUtils.nowInNano(); + BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit); packetPrevious = packet; entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(), @@ -1293,4 +1321,21 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } } + synchronized private void keepAlive() { + if (null != closeFuture) { + // if the log segment is closing, skip sending any keep alive records. + LOG.debug("Skip sending keepAlive control record since log segment {} is closing.", + getFullyQualifiedLogSegment()); + return; + } + + if (MathUtils.elapsedMSec(lastTransmitNanos) < periodicKeepAliveMs) { + return; + } + + LogRecord controlRec = new LogRecord(lastTxId, DistributedLogConstants.KEEPALIVE_RECORD_CONTENT); + controlRec.setControl(); + asyncWrite(controlRec); + } + } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java index d2af862..c2057df 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java @@ -249,6 +249,8 @@ public class DistributedLogConfiguration extends CompositeConfiguration { public static final boolean BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT = false; public static final String BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS = "minimumDelayBetweenImmediateFlushMilliSeconds"; public static final int BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT = 0; + public static final String BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS = "periodicKeepAliveMilliSeconds"; + public static final int BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT = 0; // Retention/Truncation Settings public static final String BKDL_RETENTION_PERIOD_IN_HOURS = "logSegmentRetentionHours"; @@ -1893,6 +1895,29 @@ public class DistributedLogConfiguration extends CompositeConfiguration { return this; } + /** + * Get Periodic Keep Alive Frequency in milliseconds. + * <p>If the setting is set with a positive value, it would periodically write a control record + * to keep the stream active. The default value is 0. + * + * @return periodic keep alive frequency in milliseconds. + */ + public int getPeriodicKeepAliveMilliSeconds() { + return this.getInt(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT); + } + + /** + * Set Periodic Keep Alive Frequency in milliseconds. + * + * @param keepAliveMs keep alive frequency in milliseconds. + * @return distributedlog configuration + * @see #getPeriodicKeepAliveMilliSeconds() + */ + public DistributedLogConfiguration setPeriodicKeepAliveMilliSeconds(int keepAliveMs) { + setProperty(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, keepAliveMs); + return this; + } + // // DL Retention/Truncation Settings // http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java index 5c50282..32def94 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java @@ -58,6 +58,7 @@ public class DistributedLogConstants { public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs"; public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement"; static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8); + public static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8); // An ACL that gives all permissions to node creators and read permissions only to everyone else. public static final List<ACL> EVERYONE_READ_CREATOR_ALL = http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java index 8011a04..a4832b0 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java @@ -2115,4 +2115,57 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { } } } + + @Test(timeout = 60000) + public void testIdleReaderExceptionWhenKeepAliveIsDisabled() throws Exception { + String name = runtime.getMethodName(); + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(testConf); + confLocal.setOutputBufferSize(0); + confLocal.setImmediateFlushEnabled(false); + confLocal.setPeriodicFlushFrequencyMilliSeconds(0); + confLocal.setPeriodicKeepAliveMilliSeconds(0); + confLocal.setReaderIdleWarnThresholdMillis(20); + confLocal.setReaderIdleErrorThresholdMillis(40); + + URI uri = createDLMURI("/" + name); + ensureURICreated(uri); + + DistributedLogManager dlm = createNewDLM(confLocal, name); + BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter()); + writer.write(DLMTestUtil.getLogRecordInstance(1L)); + + AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN)); + try { + FutureUtils.result(reader.readNext()); + fail("Should fail when stream is idle"); + } catch (IdleReaderException ire) { + // expected + } + } + + @Test(timeout = 60000) + public void testIdleReaderExceptionWhenKeepAliveIsEnabled() throws Exception { + String name = runtime.getMethodName(); + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(testConf); + confLocal.setOutputBufferSize(0); + confLocal.setImmediateFlushEnabled(false); + confLocal.setPeriodicFlushFrequencyMilliSeconds(0); + confLocal.setPeriodicKeepAliveMilliSeconds(1000); + confLocal.setReaderIdleWarnThresholdMillis(2000); + confLocal.setReaderIdleErrorThresholdMillis(4000); + + URI uri = createDLMURI("/" + name); + ensureURICreated(uri); + + DistributedLogManager dlm = createNewDLM(confLocal, name); + BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter()); + writer.write(DLMTestUtil.getLogRecordInstance(1L)); + + AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN)); + LogRecordWithDLSN record = FutureUtils.result(reader.readNext()); + assertEquals(1L, record.getTransactionId()); + DLMTestUtil.verifyLogRecord(record); + } }