Be able to close the writer within a timeout period RB_ID=841340
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/0a18f564 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/0a18f564 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/0a18f564 Branch: refs/heads/merge/DL-98 Commit: 0a18f564ba7e4d03f4c6a0859e5478535a59befd Parents: 800b867 Author: Leigh Stewart <lstew...@twitter.com> Authored: Mon Dec 12 16:46:11 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Mon Dec 12 16:46:11 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKAsyncLogWriter.java | 33 +++++++++++++------- .../distributedlog/util/FutureUtils.java | 20 +++++++++++- .../service/config/ServerConfiguration.java | 25 +++++++++++++++ .../service/stream/StreamFactoryImpl.java | 6 +++- .../service/stream/StreamImpl.java | 32 +++++++++++++++++-- 5 files changed, 99 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0a18f564/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java index f1594f9..79f5f5e 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java @@ -212,8 +212,10 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri boolean rollLog, boolean allowMaxTxID) { Stopwatch stopwatch = Stopwatch.createStarted(); - return doGetLogSegmentWriter(firstTxid, bestEffort, rollLog, allowMaxTxID) - .addEventListener(new OpStatsListener<BKLogSegmentWriter>(getWriterOpStatsLogger, stopwatch)); + return FutureUtils.stats( + doGetLogSegmentWriter(firstTxid, bestEffort, rollLog, allowMaxTxID), + getWriterOpStatsLogger, + stopwatch); } private Future<BKLogSegmentWriter> doGetLogSegmentWriter(final long firstTxid, @@ -415,8 +417,10 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri @Override public Future<DLSN> write(final LogRecord record) { final Stopwatch stopwatch = Stopwatch.createStarted(); - return asyncWrite(record, true) - .addEventListener(new OpStatsListener<DLSN>(writeOpStatsLogger, stopwatch)); + return FutureUtils.stats( + asyncWrite(record, true), + writeOpStatsLogger, + stopwatch); } /** @@ -430,8 +434,10 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri @Override public Future<List<Future<DLSN>>> writeBulk(final List<LogRecord> records) { final Stopwatch stopwatch = Stopwatch.createStarted(); - return Future.value(asyncWriteBulk(records)) - .addEventListener(new OpStatsListener<List<Future<DLSN>>>(bulkWriteOpStatsLogger, stopwatch)); + return FutureUtils.stats( + Future.value(asyncWriteBulk(records)), + bulkWriteOpStatsLogger, + stopwatch); } @Override @@ -478,12 +484,15 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri logSegmentWriterFuture = getLogSegmentWriterForEndOfStream(); } - return logSegmentWriterFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() { - @Override - public Future<Long> apply(BKLogSegmentWriter w) { - return w.markEndOfStream(); - } - }).addEventListener(new OpStatsListener<Long>(markEndOfStreamOpStatsLogger, stopwatch)); + return FutureUtils.stats( + logSegmentWriterFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() { + @Override + public Future<Long> apply(BKLogSegmentWriter w) { + return w.markEndOfStream(); + } + }), + markEndOfStreamOpStatsLogger, + stopwatch); } @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0a18f564/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java index f0540d7..6a5f7a7 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java @@ -17,13 +17,15 @@ */ package com.twitter.distributedlog.util; -import com.twitter.distributedlog.exceptions.BKTransmitException; +import com.google.common.base.Stopwatch; import com.twitter.distributedlog.DistributedLogConstants; +import com.twitter.distributedlog.exceptions.BKTransmitException; import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.exceptions.DLInterruptedException; import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.exceptions.ZKException; +import com.twitter.distributedlog.stats.OpStatsListener; import com.twitter.util.Await; import com.twitter.util.Duration; import com.twitter.util.Function; @@ -34,6 +36,7 @@ import com.twitter.util.Promise; import com.twitter.util.Return; import com.twitter.util.Throw; import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -225,6 +228,21 @@ public class FutureUtils { } /** + * Add a event listener over <i>result</i> for collecting the operation stats. + * + * @param result result to listen on + * @param opStatsLogger stats logger to record operations stats + * @param stopwatch stop watch to time operation + * @param <T> + * @return result after registered the event listener + */ + public static <T> Future<T> stats(Future<T> result, + OpStatsLogger opStatsLogger, + Stopwatch stopwatch) { + return result.addEventListener(new OpStatsListener<T>(opStatsLogger, stopwatch)); + } + + /** * Await for the result of the future and thrown bk related exceptions. * * @param result future to wait for http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0a18f564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java index 90ff6e6..09661c0 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java @@ -79,6 +79,10 @@ public class ServerConfiguration extends CompositeConfiguration { public static final String SERVER_SERVICE_TIMEOUT_MS_OLD = "serviceTimeoutMs"; public static final long SERVER_SERVICE_TIMEOUT_MS_DEFAULT = 0; + // Server close writer timeout + public static final String SERVER_WRITER_CLOSE_TIMEOUT_MS = "server_writer_close_timeout_ms"; + public static final long SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT = 1000; + // Server stream probation timeout public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = "server_stream_probation_timeout_ms"; public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = "streamProbationTimeoutMs"; @@ -296,6 +300,27 @@ public class ServerConfiguration extends CompositeConfiguration { } /** + * Get timeout for closing writer in proxy layer. 0 disables timeout. + * + * @return timeout for closing writer in proxy layer. + */ + public long getWriterCloseTimeoutMs() { + return getLong(SERVER_WRITER_CLOSE_TIMEOUT_MS, SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT); + } + + /** + * Set timeout for closing writer in proxy layer. 0 disables timeout. + * + * @param timeoutMs + * timeout for closing writer in proxy layer. + * @return dl configuration. + */ + public ServerConfiguration setWriterCloseTimeoutMs(long timeoutMs) { + setProperty(SERVER_WRITER_CLOSE_TIMEOUT_MS, timeoutMs); + return this; + } + + /** * After service timeout, how long should stream be kept in cache in probationary state in order * to prevent reacquire. In millisec. * http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0a18f564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java index bc53fe3..cb28f1e 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java @@ -25,6 +25,7 @@ import com.twitter.distributedlog.service.config.ServerConfiguration; import com.twitter.distributedlog.service.config.StreamConfigProvider; import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; import com.twitter.distributedlog.util.OrderedScheduler; +import com.twitter.util.Timer; import org.apache.bookkeeper.feature.FeatureProvider; import org.jboss.netty.util.HashedWheelTimer; @@ -40,6 +41,7 @@ public class StreamFactoryImpl implements StreamFactory { private final OrderedScheduler scheduler; private final FatalErrorHandler fatalErrorHandler; private final HashedWheelTimer requestTimer; + private final Timer futureTimer; public StreamFactoryImpl(String clientId, StreamOpStats streamOpStats, @@ -64,6 +66,7 @@ public class StreamFactoryImpl implements StreamFactory { this.scheduler = scheduler; this.fatalErrorHandler = fatalErrorHandler; this.requestTimer = requestTimer; + this.futureTimer = new com.twitter.finagle.util.HashedWheelTimer(requestTimer); } @Override @@ -83,6 +86,7 @@ public class StreamFactoryImpl implements StreamFactory { dlNamespace, scheduler, fatalErrorHandler, - requestTimer); + requestTimer, + futureTimer); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0a18f564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java index 45630fe..1204d39 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java @@ -45,10 +45,13 @@ import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.TimeSequencer; import com.twitter.distributedlog.util.Utils; +import com.twitter.util.Duration; import com.twitter.util.Function0; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; +import com.twitter.util.TimeoutException; +import com.twitter.util.Timer; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.stats.Counter; @@ -141,9 +144,11 @@ public class StreamImpl implements Stream { private final StreamConfigProvider streamConfigProvider; private final FatalErrorHandler fatalErrorHandler; private final long streamProbationTimeoutMs; - private long serviceTimeoutMs; + private final long serviceTimeoutMs; + private final long writerCloseTimeoutMs; private final boolean failFastOnStreamNotReady; private final HashedWheelTimer requestTimer; + private final Timer futureTimer; // Stats private final StatsLogger streamLogger; @@ -151,8 +156,10 @@ public class StreamImpl implements Stream { private final StatsLogger limiterStatLogger; private final Counter serviceTimeout; private final OpStatsLogger streamAcquireStat; + private final OpStatsLogger writerCloseStatLogger; private final Counter pendingOpsCounter; private final Counter unexpectedExceptions; + private final Counter writerCloseTimeoutCounter; private final StatsLogger exceptionStatLogger; private final ConcurrentHashMap<String, Counter> exceptionCounters = new ConcurrentHashMap<String, Counter>(); @@ -173,7 +180,8 @@ public class StreamImpl implements Stream { DistributedLogNamespace dlNamespace, OrderedScheduler scheduler, FatalErrorHandler fatalErrorHandler, - HashedWheelTimer requestTimer) { + HashedWheelTimer requestTimer, + Timer futureTimer) { this.clientId = clientId; this.dlConfig = dlConfig; this.streamManager = streamManager; @@ -189,6 +197,7 @@ public class StreamImpl implements Stream { this.scheduler = scheduler; this.serviceTimeoutMs = serverConfig.getServiceTimeoutMs(); this.streamProbationTimeoutMs = serverConfig.getStreamProbationTimeoutMs(); + this.writerCloseTimeoutMs = serverConfig.getWriterCloseTimeoutMs(); this.failFastOnStreamNotReady = dlConfig.getFailFastOnStreamNotReady(); this.fatalErrorHandler = fatalErrorHandler; this.dynConf = streamConf; @@ -197,6 +206,7 @@ public class StreamImpl implements Stream { streamOpStats.streamRequestScope(partition, "limiter")); this.limiter = new StreamRequestLimiter(name, dynConf, limiterStatsLogger, featureRateLimitDisabled); this.requestTimer = requestTimer; + this.futureTimer = futureTimer; // Stats this.streamLogger = streamOpStats.streamRequestStatsLogger(partition); @@ -208,6 +218,8 @@ public class StreamImpl implements Stream { this.pendingOpsCounter = streamOpStats.baseCounter("pending_ops"); this.unexpectedExceptions = streamOpStats.baseCounter("unexpected_exceptions"); this.exceptionStatLogger = streamOpStats.requestScope("exceptions"); + this.writerCloseStatLogger = streamsStatsLogger.getOpStatsLogger("writer_close"); + this.writerCloseTimeoutCounter = streamsStatsLogger.getCounter("writer_close_timeouts"); } @Override @@ -953,7 +965,18 @@ public class StreamImpl implements Stream { closeWriterFuture = Utils.asyncClose(writer, true); } // close the manager and error out pending requests after close writer - closeWriterFuture.addEventListener(FutureUtils.OrderedFutureEventListener.of( + Duration closeWaitDuration; + if (writerCloseTimeoutMs <= 0) { + closeWaitDuration = Duration.Top(); + } else { + closeWaitDuration = Duration.fromMilliseconds(writerCloseTimeoutMs); + } + FutureUtils.stats( + closeWriterFuture, + writerCloseStatLogger, + Stopwatch.createStarted() + ).masked().within(futureTimer, closeWaitDuration) + .addEventListener(FutureUtils.OrderedFutureEventListener.of( new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { @@ -962,6 +985,9 @@ public class StreamImpl implements Stream { } @Override public void onFailure(Throwable cause) { + if (cause instanceof TimeoutException) { + writerCloseTimeoutCounter.inc(); + } closeManagerAndErrorOutPendingRequests(); FutureUtils.setValue(closePromise, null); }