Repository: incubator-distributedlog Updated Branches: refs/heads/master 0ed872353 -> f607a48ff
DL-115: fix force get log segment logic Summary: We should remove the ledger closed check: - force to get log segments if the reader has been idle for a while after open the ledger - idleReaderWarnThreshold should be 2x larger than readLACLongPollTimeout Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/00be3e58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/00be3e58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/00be3e58 Branch: refs/heads/master Commit: 00be3e58d9a4a4e48989c8e45825a0c4911f9cdc Parents: 0ed8723 Author: Yiming Zang <yz...@twitter.com> Authored: Wed Nov 16 14:14:52 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:06:55 2016 -0800 ---------------------------------------------------------------------- .../DistributedLogConfiguration.java | 4 +- .../readahead/ReadAheadWorker.java | 46 +++++++++----------- 2 files changed, 23 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/00be3e58/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java index ebb7ae2..46a056b 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java @@ -3467,8 +3467,10 @@ public class DistributedLogConfiguration extends CompositeConfiguration { */ public void validate() { Preconditions.checkArgument(getBKClientReadTimeout() * 1000 >= getReadLACLongPollTimeout(), - "Invalid timeout configuration : bkcReadTimeoutSeconds ("+getBKClientReadTimeout()+ + "Invalid timeout configuration: bkcReadTimeoutSeconds ("+getBKClientReadTimeout()+ ") should be longer than readLACLongPollTimeout ("+getReadLACLongPollTimeout()+")"); + Preconditions.checkArgument(getReaderIdleWarnThresholdMillis() > 2 * getReadLACLongPollTimeout(), + "Invalid configuration: ReaderIdleWarnThreshold should be 2x larget than readLACLongPollTimeout"); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/00be3e58/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java index 9a1911e..0217560 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java @@ -180,7 +180,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea protected final long metadataLatencyWarnThresholdMillis; final ReadAheadTracker tracker; final Stopwatch resumeStopWatch; - final Stopwatch lastLedgerCloseDetected = Stopwatch.createUnstarted(); + final Stopwatch LACNotAdvancedStopWatch = Stopwatch.createUnstarted(); // Misc private final boolean readAheadSkipBrokenEntries; // Stats @@ -901,9 +901,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea }); } else { final long lastAddConfirmed; - boolean isClosed; try { - isClosed = handleCache.isLedgerHandleClosed(currentLH); lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH); } catch (BKException ie) { // Exception is thrown due to no ledger handle @@ -912,30 +910,26 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea } if (lastAddConfirmed < nextReadAheadPosition.getEntryId()) { - if (isClosed) { - // This indicates that the currentMetadata is still marked in - // progress while the ledger has been closed. This specific ledger - // is not going to produce any more entries - so we should - // be reading metadata entries to mark the current metadata - // as complete - if (lastLedgerCloseDetected.isRunning()) { - if (lastLedgerCloseDetected.elapsed(TimeUnit.MILLISECONDS) - > conf.getReaderIdleWarnThresholdMillis()) { - idleReaderWarn.inc(); - LOG.info("{} Ledger {} for inprogress segment {} closed for idle reader warn threshold", - new Object[] { fullyQualifiedName, currentMetadata, currentLH }); - reInitializeMetadata = true; - forceReadLogSegments = true; - } - } else { - lastLedgerCloseDetected.reset().start(); - if (conf.getTraceReadAheadMetadataChanges()) { - LOG.info("{} Ledger {} for inprogress segment {} closed", - new Object[] { fullyQualifiedName, currentMetadata, currentLH }); - } + // This indicates that the currentMetadata is still marked in + // progress while we have already read all the entries. It might + // indicate a failure to detect metadata change. So we + // should probably try force read log segments if the reader has + // been idle for after a while. + if (LACNotAdvancedStopWatch.isRunning()) { + if (LACNotAdvancedStopWatch.elapsed(TimeUnit.MILLISECONDS) + > conf.getReaderIdleWarnThresholdMillis()) { + idleReaderWarn.inc(); + LOG.info("{} Ledger {} for inprogress segment {}, reader has been idle for warn threshold {}", + new Object[] { fullyQualifiedName, currentMetadata, currentLH, conf.getReaderIdleWarnThresholdMillis() }); + reInitializeMetadata = true; + forceReadLogSegments = true; } } else { - lastLedgerCloseDetected.reset(); + LACNotAdvancedStopWatch.reset().start(); + if (conf.getTraceReadAheadMetadataChanges()) { + LOG.info("{} Ledger {} for inprogress segment {} closed", + new Object[] { fullyQualifiedName, currentMetadata, currentLH }); + } } tracker.enterPhase(ReadAheadPhase.READ_LAST_CONFIRMED); @@ -966,7 +960,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea } } } else { - lastLedgerCloseDetected.reset(); + LACNotAdvancedStopWatch.reset(); if (null != currentLH) { try { if (inProgressChanged) {