Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 0ed872353 -> f607a48ff


DL-115: fix force get log segment logic

Summary:

We should remove the ledger closed check:

- force to get log segments if the reader has been idle for a while after open 
the ledger
- idleReaderWarnThreshold should be 2x larger than readLACLongPollTimeout


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/00be3e58
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/00be3e58
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/00be3e58

Branch: refs/heads/master
Commit: 00be3e58d9a4a4e48989c8e45825a0c4911f9cdc
Parents: 0ed8723
Author: Yiming Zang <yz...@twitter.com>
Authored: Wed Nov 16 14:14:52 2016 -0800
Committer: Sijie Guo <sij...@twitter.com>
Committed: Thu Dec 29 02:06:55 2016 -0800

----------------------------------------------------------------------
 .../DistributedLogConfiguration.java            |  4 +-
 .../readahead/ReadAheadWorker.java              | 46 +++++++++-----------
 2 files changed, 23 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/00be3e58/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index ebb7ae2..46a056b 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -3467,8 +3467,10 @@ public class DistributedLogConfiguration extends 
CompositeConfiguration {
      */
     public void validate() {
         Preconditions.checkArgument(getBKClientReadTimeout() * 1000 >= 
getReadLACLongPollTimeout(),
-            "Invalid timeout configuration : bkcReadTimeoutSeconds 
("+getBKClientReadTimeout()+
+            "Invalid timeout configuration: bkcReadTimeoutSeconds 
("+getBKClientReadTimeout()+
                 ") should be longer than readLACLongPollTimeout 
("+getReadLACLongPollTimeout()+")");
+        Preconditions.checkArgument(getReaderIdleWarnThresholdMillis() > 2 * 
getReadLACLongPollTimeout(),
+            "Invalid configuration: ReaderIdleWarnThreshold should be 2x 
larget than readLACLongPollTimeout");
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/00be3e58/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
index 9a1911e..0217560 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
@@ -180,7 +180,7 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, AsyncClosea
     protected final long metadataLatencyWarnThresholdMillis;
     final ReadAheadTracker tracker;
     final Stopwatch resumeStopWatch;
-    final Stopwatch lastLedgerCloseDetected = Stopwatch.createUnstarted();
+    final Stopwatch LACNotAdvancedStopWatch = Stopwatch.createUnstarted();
     // Misc
     private final boolean readAheadSkipBrokenEntries;
     // Stats
@@ -901,9 +901,7 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, AsyncClosea
                             });
                 } else {
                     final long lastAddConfirmed;
-                    boolean isClosed;
                     try {
-                        isClosed = handleCache.isLedgerHandleClosed(currentLH);
                         lastAddConfirmed = 
handleCache.getLastAddConfirmed(currentLH);
                     } catch (BKException ie) {
                         // Exception is thrown due to no ledger handle
@@ -912,30 +910,26 @@ public class ReadAheadWorker implements 
ReadAheadCallback, Runnable, AsyncClosea
                     }
 
                     if (lastAddConfirmed < nextReadAheadPosition.getEntryId()) 
{
-                        if (isClosed) {
-                            // This indicates that the currentMetadata is 
still marked in
-                            // progress while the ledger has been closed. This 
specific ledger
-                            // is not going to produce any more entries - so 
we should
-                            // be reading metadata entries to mark the current 
metadata
-                            // as complete
-                            if (lastLedgerCloseDetected.isRunning()) {
-                                if 
(lastLedgerCloseDetected.elapsed(TimeUnit.MILLISECONDS)
-                                    > conf.getReaderIdleWarnThresholdMillis()) 
{
-                                    idleReaderWarn.inc();
-                                    LOG.info("{} Ledger {} for inprogress 
segment {} closed for idle reader warn threshold",
-                                        new Object[] { fullyQualifiedName, 
currentMetadata, currentLH });
-                                    reInitializeMetadata = true;
-                                    forceReadLogSegments = true;
-                                }
-                            } else {
-                                lastLedgerCloseDetected.reset().start();
-                                if (conf.getTraceReadAheadMetadataChanges()) {
-                                    LOG.info("{} Ledger {} for inprogress 
segment {} closed",
-                                        new Object[] { fullyQualifiedName, 
currentMetadata, currentLH });
-                                }
+                        // This indicates that the currentMetadata is still 
marked in
+                        // progress while we have already read all the 
entries. It might
+                        // indicate a failure to detect metadata change. So we
+                        // should probably try force read log segments if the 
reader has
+                        // been idle for after a while.
+                        if (LACNotAdvancedStopWatch.isRunning()) {
+                            if 
(LACNotAdvancedStopWatch.elapsed(TimeUnit.MILLISECONDS)
+                                > conf.getReaderIdleWarnThresholdMillis()) {
+                                idleReaderWarn.inc();
+                                LOG.info("{} Ledger {} for inprogress segment 
{}, reader has been idle for warn threshold {}",
+                                    new Object[] { fullyQualifiedName, 
currentMetadata, currentLH, conf.getReaderIdleWarnThresholdMillis() });
+                                reInitializeMetadata = true;
+                                forceReadLogSegments = true;
                             }
                         } else {
-                            lastLedgerCloseDetected.reset();
+                            LACNotAdvancedStopWatch.reset().start();
+                            if (conf.getTraceReadAheadMetadataChanges()) {
+                                LOG.info("{} Ledger {} for inprogress segment 
{} closed",
+                                    new Object[] { fullyQualifiedName, 
currentMetadata, currentLH });
+                            }
                         }
 
                         tracker.enterPhase(ReadAheadPhase.READ_LAST_CONFIRMED);
@@ -966,7 +960,7 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, AsyncClosea
                     }
                 }
             } else {
-                lastLedgerCloseDetected.reset();
+                LACNotAdvancedStopWatch.reset();
                 if (null != currentLH) {
                     try {
                         if (inProgressChanged) {

Reply via email to