Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 6e67a377f -> 26942a91a


DL-91: Be able to close writer within a timeout duration

merge twitter's change on closing writer.

Author: Sijie Guo <sij...@twitter.com>
Author: Sijie Guo <si...@apache.org>
Author: Jordan Bull <jb...@twitter.com>
Author: Leigh Stewart <lstew...@twitter.com>
Author: Dave Rusek <dave.ru...@gmail.com>
Author: Dave Rusek <dru...@twitter.com>

Reviewers: Leigh Stewart <lstew...@apache.org>

Closes #64 from sijie/merge/DL-91


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/26942a91
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/26942a91
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/26942a91

Branch: refs/heads/master
Commit: 26942a91a67e1d0052b46db278e7ba8bdb414269
Parents: 6e67a37
Author: Sijie Guo <sij...@twitter.com>
Authored: Tue Dec 20 00:04:51 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Dec 20 00:04:51 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogWriter.java        | 33 +++++++++++++-------
 .../distributedlog/util/FutureUtils.java        | 20 +++++++++++-
 .../service/config/ServerConfiguration.java     | 25 +++++++++++++++
 .../service/stream/StreamFactoryImpl.java       |  6 +++-
 .../service/stream/StreamImpl.java              | 32 +++++++++++++++++--
 5 files changed, 99 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/26942a91/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
index f1594f9..79f5f5e 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
@@ -212,8 +212,10 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWri
                                                            boolean rollLog,
                                                            boolean 
allowMaxTxID) {
         Stopwatch stopwatch = Stopwatch.createStarted();
-        return doGetLogSegmentWriter(firstTxid, bestEffort, rollLog, 
allowMaxTxID)
-                .addEventListener(new 
OpStatsListener<BKLogSegmentWriter>(getWriterOpStatsLogger, stopwatch));
+        return FutureUtils.stats(
+                doGetLogSegmentWriter(firstTxid, bestEffort, rollLog, 
allowMaxTxID),
+                getWriterOpStatsLogger,
+                stopwatch);
     }
 
     private Future<BKLogSegmentWriter> doGetLogSegmentWriter(final long 
firstTxid,
@@ -415,8 +417,10 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWri
     @Override
     public Future<DLSN> write(final LogRecord record) {
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        return asyncWrite(record, true)
-                .addEventListener(new 
OpStatsListener<DLSN>(writeOpStatsLogger, stopwatch));
+        return FutureUtils.stats(
+                asyncWrite(record, true),
+                writeOpStatsLogger,
+                stopwatch);
     }
 
     /**
@@ -430,8 +434,10 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWri
     @Override
     public Future<List<Future<DLSN>>> writeBulk(final List<LogRecord> records) 
{
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        return Future.value(asyncWriteBulk(records))
-                .addEventListener(new 
OpStatsListener<List<Future<DLSN>>>(bulkWriteOpStatsLogger, stopwatch));
+        return FutureUtils.stats(
+                Future.value(asyncWriteBulk(records)),
+                bulkWriteOpStatsLogger,
+                stopwatch);
     }
 
     @Override
@@ -478,12 +484,15 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWri
             logSegmentWriterFuture = getLogSegmentWriterForEndOfStream();
         }
 
-        return logSegmentWriterFuture.flatMap(new 
AbstractFunction1<BKLogSegmentWriter, Future<Long>>() {
-            @Override
-            public Future<Long> apply(BKLogSegmentWriter w) {
-                return w.markEndOfStream();
-            }
-        }).addEventListener(new 
OpStatsListener<Long>(markEndOfStreamOpStatsLogger, stopwatch));
+        return FutureUtils.stats(
+                logSegmentWriterFuture.flatMap(new 
AbstractFunction1<BKLogSegmentWriter, Future<Long>>() {
+                    @Override
+                    public Future<Long> apply(BKLogSegmentWriter w) {
+                        return w.markEndOfStream();
+                    }
+                }),
+                markEndOfStreamOpStatsLogger,
+                stopwatch);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/26942a91/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
index f0540d7..6a5f7a7 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
@@ -17,13 +17,15 @@
  */
 package com.twitter.distributedlog.util;
 
-import com.twitter.distributedlog.exceptions.BKTransmitException;
+import com.google.common.base.Stopwatch;
 import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.exceptions.BKTransmitException;
 import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.stats.OpStatsListener;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
 import com.twitter.util.Function;
@@ -34,6 +36,7 @@ import com.twitter.util.Promise;
 import com.twitter.util.Return;
 import com.twitter.util.Throw;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -225,6 +228,21 @@ public class FutureUtils {
     }
 
     /**
+     * Add a event listener over <i>result</i> for collecting the operation 
stats.
+     *
+     * @param result result to listen on
+     * @param opStatsLogger stats logger to record operations stats
+     * @param stopwatch stop watch to time operation
+     * @param <T>
+     * @return result after registered the event listener
+     */
+    public static <T> Future<T> stats(Future<T> result,
+                                      OpStatsLogger opStatsLogger,
+                                      Stopwatch stopwatch) {
+        return result.addEventListener(new OpStatsListener<T>(opStatsLogger, 
stopwatch));
+    }
+
+    /**
      * Await for the result of the future and thrown bk related exceptions.
      *
      * @param result future to wait for

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/26942a91/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
index 90ff6e6..09661c0 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
@@ -79,6 +79,10 @@ public class ServerConfiguration extends 
CompositeConfiguration {
     public static final String SERVER_SERVICE_TIMEOUT_MS_OLD = 
"serviceTimeoutMs";
     public static final long SERVER_SERVICE_TIMEOUT_MS_DEFAULT = 0;
 
+    // Server close writer timeout
+    public static final String SERVER_WRITER_CLOSE_TIMEOUT_MS = 
"server_writer_close_timeout_ms";
+    public static final long SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT = 1000;
+
     // Server stream probation timeout
     public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = 
"server_stream_probation_timeout_ms";
     public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = 
"streamProbationTimeoutMs";
@@ -296,6 +300,27 @@ public class ServerConfiguration extends 
CompositeConfiguration {
     }
 
     /**
+     * Get timeout for closing writer in proxy layer. 0 disables timeout.
+     *
+     * @return timeout for closing writer in proxy layer.
+     */
+    public long getWriterCloseTimeoutMs() {
+        return getLong(SERVER_WRITER_CLOSE_TIMEOUT_MS, 
SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT);
+    }
+
+    /**
+     * Set timeout for closing writer in proxy layer. 0 disables timeout.
+     *
+     * @param timeoutMs
+     *          timeout for closing writer in proxy layer.
+     * @return dl configuration.
+     */
+    public ServerConfiguration setWriterCloseTimeoutMs(long timeoutMs) {
+        setProperty(SERVER_WRITER_CLOSE_TIMEOUT_MS, timeoutMs);
+        return this;
+    }
+
+    /**
      * After service timeout, how long should stream be kept in cache in 
probationary state in order
      * to prevent reacquire. In millisec.
      *

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/26942a91/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
index bc53fe3..cb28f1e 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
@@ -25,6 +25,7 @@ import 
com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.service.config.StreamConfigProvider;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.util.Timer;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.jboss.netty.util.HashedWheelTimer;
 
@@ -40,6 +41,7 @@ public class StreamFactoryImpl implements StreamFactory {
     private final OrderedScheduler scheduler;
     private final FatalErrorHandler fatalErrorHandler;
     private final HashedWheelTimer requestTimer;
+    private final Timer futureTimer;
 
     public StreamFactoryImpl(String clientId,
         StreamOpStats streamOpStats,
@@ -64,6 +66,7 @@ public class StreamFactoryImpl implements StreamFactory {
         this.scheduler = scheduler;
         this.fatalErrorHandler = fatalErrorHandler;
         this.requestTimer = requestTimer;
+        this.futureTimer = new 
com.twitter.finagle.util.HashedWheelTimer(requestTimer);
     }
 
     @Override
@@ -83,6 +86,7 @@ public class StreamFactoryImpl implements StreamFactory {
             dlNamespace,
             scheduler,
             fatalErrorHandler,
-            requestTimer);
+            requestTimer,
+            futureTimer);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/26942a91/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 45630fe..1204d39 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -45,10 +45,13 @@ import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.TimeSequencer;
 import com.twitter.distributedlog.util.Utils;
+import com.twitter.util.Duration;
 import com.twitter.util.Function0;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
+import com.twitter.util.TimeoutException;
+import com.twitter.util.Timer;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.Counter;
@@ -141,9 +144,11 @@ public class StreamImpl implements Stream {
     private final StreamConfigProvider streamConfigProvider;
     private final FatalErrorHandler fatalErrorHandler;
     private final long streamProbationTimeoutMs;
-    private long serviceTimeoutMs;
+    private final long serviceTimeoutMs;
+    private final long writerCloseTimeoutMs;
     private final boolean failFastOnStreamNotReady;
     private final HashedWheelTimer requestTimer;
+    private final Timer futureTimer;
 
     // Stats
     private final StatsLogger streamLogger;
@@ -151,8 +156,10 @@ public class StreamImpl implements Stream {
     private final StatsLogger limiterStatLogger;
     private final Counter serviceTimeout;
     private final OpStatsLogger streamAcquireStat;
+    private final OpStatsLogger writerCloseStatLogger;
     private final Counter pendingOpsCounter;
     private final Counter unexpectedExceptions;
+    private final Counter writerCloseTimeoutCounter;
     private final StatsLogger exceptionStatLogger;
     private final ConcurrentHashMap<String, Counter> exceptionCounters =
         new ConcurrentHashMap<String, Counter>();
@@ -173,7 +180,8 @@ public class StreamImpl implements Stream {
                DistributedLogNamespace dlNamespace,
                OrderedScheduler scheduler,
                FatalErrorHandler fatalErrorHandler,
-               HashedWheelTimer requestTimer) {
+               HashedWheelTimer requestTimer,
+               Timer futureTimer) {
         this.clientId = clientId;
         this.dlConfig = dlConfig;
         this.streamManager = streamManager;
@@ -189,6 +197,7 @@ public class StreamImpl implements Stream {
         this.scheduler = scheduler;
         this.serviceTimeoutMs = serverConfig.getServiceTimeoutMs();
         this.streamProbationTimeoutMs = 
serverConfig.getStreamProbationTimeoutMs();
+        this.writerCloseTimeoutMs = serverConfig.getWriterCloseTimeoutMs();
         this.failFastOnStreamNotReady = dlConfig.getFailFastOnStreamNotReady();
         this.fatalErrorHandler = fatalErrorHandler;
         this.dynConf = streamConf;
@@ -197,6 +206,7 @@ public class StreamImpl implements Stream {
             streamOpStats.streamRequestScope(partition, "limiter"));
         this.limiter = new StreamRequestLimiter(name, dynConf, 
limiterStatsLogger, featureRateLimitDisabled);
         this.requestTimer = requestTimer;
+        this.futureTimer = futureTimer;
 
         // Stats
         this.streamLogger = streamOpStats.streamRequestStatsLogger(partition);
@@ -208,6 +218,8 @@ public class StreamImpl implements Stream {
         this.pendingOpsCounter = streamOpStats.baseCounter("pending_ops");
         this.unexpectedExceptions = 
streamOpStats.baseCounter("unexpected_exceptions");
         this.exceptionStatLogger = streamOpStats.requestScope("exceptions");
+        this.writerCloseStatLogger = 
streamsStatsLogger.getOpStatsLogger("writer_close");
+        this.writerCloseTimeoutCounter = 
streamsStatsLogger.getCounter("writer_close_timeouts");
     }
 
     @Override
@@ -953,7 +965,18 @@ public class StreamImpl implements Stream {
             closeWriterFuture = Utils.asyncClose(writer, true);
         }
         // close the manager and error out pending requests after close writer
-        
closeWriterFuture.addEventListener(FutureUtils.OrderedFutureEventListener.of(
+        Duration closeWaitDuration;
+        if (writerCloseTimeoutMs <= 0) {
+            closeWaitDuration = Duration.Top();
+        } else {
+            closeWaitDuration = 
Duration.fromMilliseconds(writerCloseTimeoutMs);
+        }
+        FutureUtils.stats(
+                closeWriterFuture,
+                writerCloseStatLogger,
+                Stopwatch.createStarted()
+        ).masked().within(futureTimer, closeWaitDuration)
+                .addEventListener(FutureUtils.OrderedFutureEventListener.of(
                 new FutureEventListener<Void>() {
                     @Override
                     public void onSuccess(Void value) {
@@ -962,6 +985,9 @@ public class StreamImpl implements Stream {
                     }
                     @Override
                     public void onFailure(Throwable cause) {
+                        if (cause instanceof TimeoutException) {
+                            writerCloseTimeoutCounter.inc();
+                        }
                         closeManagerAndErrorOutPendingRequests();
                         FutureUtils.setValue(closePromise, null);
                     }

Reply via email to