Repository: kafka Updated Branches: refs/heads/trunk c39e79bb5 -> 70a784b64
KAFKA-2744: Commit source task offsets after task is completely stopped Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Guozhang Wang Closes #423 from ewencp/commit-source-offsets-after-work-thread-exits Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/70a784b6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/70a784b6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/70a784b6 Branch: refs/heads/trunk Commit: 70a784b64ab61bcd517619fed44419d59d467b27 Parents: c39e79b Author: Ewen Cheslack-Postava <[email protected]> Authored: Wed Nov 4 12:05:34 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 4 12:05:34 2015 -0800 ---------------------------------------------------------------------- .../apache/kafka/copycat/runtime/WorkerSourceTask.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/70a784b6/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java index ea9e6b5..78b588c 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java @@ -96,24 +96,24 @@ class WorkerSourceTask implements WorkerTask { @Override public void stop() { task.stop(); - commitOffsets(); if (workThread != null) workThread.startGracefulShutdown(); } @Override public boolean awaitStop(long timeoutMs) { + boolean success = true; if (workThread != null) { try { - boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); + success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); if (!success) workThread.forceShutdown(); - return success; } catch (InterruptedException e) { - return false; + success = false; } } - return true; + commitOffsets(); + return success; } @Override
