DL-119: Fix the logging on closing readahead worker
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/34fa16b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/34fa16b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/34fa16b1 Branch: refs/heads/master Commit: 34fa16b1d7ab1ae887b1114e8e2aabdffe16608a Parents: dc4548b Author: Yiming Zang <yz...@twitter.com> Authored: Wed Nov 30 18:21:08 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:09:40 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/util/FutureUtils.java | 21 ++++++++++++++++---- .../MonitoredScheduledThreadPoolExecutor.java | 4 ++-- 2 files changed, 19 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/34fa16b1/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java index 6a647a9..266409e 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java @@ -35,11 +35,13 @@ import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; import com.twitter.util.Return; import com.twitter.util.Throw; +import com.twitter.util.Try; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; import javax.annotation.Nullable; @@ -384,14 +386,25 @@ public class FutureUtils { if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || promise.isDefined()) { return promise; } - scheduler.schedule(key, new Runnable() { + // schedule a timeout to raise timeout exception + final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() { @Override public void run() { - logger.info("Raise exception", cause); - // satisfy the promise - FutureUtils.setException(promise, cause); + if (!promise.isDefined() && FutureUtils.setException(promise, cause)) { + logger.info("Raise exception", cause); + } } }, timeout, unit); + // when the promise is satisfied, cancel the timeout task + promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() { + @Override + public BoxedUnit apply(Try<T> value) { + if (!task.cancel(true)) { + logger.debug("Failed to cancel the timeout task"); + } + return BoxedUnit.UNIT; + } + }); return promise; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/34fa16b1/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java index 512a456..75223f2 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java @@ -237,9 +237,9 @@ public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExe try { ((Future<?>) runnable).get(); } catch (CancellationException e) { - LOG.info("Task {} cancelled", runnable, e.getCause()); + LOG.debug("Task {} cancelled", runnable, e.getCause()); } catch (InterruptedException e) { - LOG.info("Task {} was interrupted", runnable, e); + LOG.debug("Task {} was interrupted", runnable, e); } catch (ExecutionException e) { return e.getCause(); }