Repository: kafka Updated Branches: refs/heads/trunk 3cf2de069 -> 9355427ef
KAFKA-3710: MemoryOffsetBackingStore shutdown ExecutorService needs to be shutdown on close, lest a zombie thread prevent clean shutdown. ewencp Author: Peter Davis <[email protected]> Reviewers: Liquan Pei <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1383 from davispw/KAFKA-3710 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9355427e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9355427e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9355427e Branch: refs/heads/trunk Commit: 9355427efc4c2c6d0f0d61dd956e5ad187da2dfc Parents: 3cf2de0 Author: Peter Davis <[email protected]> Authored: Thu May 26 21:03:14 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu May 26 21:03:14 2016 -0700 ---------------------------------------------------------------------- .../storage/MemoryOffsetBackingStore.java | 24 ++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9355427e/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java index 669e5f5..e319393 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.storage; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.Callback; import org.slf4j.Logger; @@ -30,6 +31,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** * Implementation of OffsetBackingStore that doesn't actually persist any data. To ensure this @@ -40,7 +42,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore { private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class); protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>(); - protected ExecutorService executor = Executors.newSingleThreadExecutor(); + protected ExecutorService executor; public MemoryOffsetBackingStore() { @@ -51,12 +53,26 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore { } @Override - public synchronized void start() { + public void start() { + executor = Executors.newSingleThreadExecutor(); } @Override - public synchronized void stop() { - // Nothing to do since this doesn't maintain any outstanding connections/data + public void stop() { + if (executor != null) { + executor.shutdown(); + // Best effort wait for any get() and set() tasks (and caller's callbacks) to complete. + try { + executor.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (!executor.shutdownNow().isEmpty()) { + throw new ConnectException("Failed to stop MemoryOffsetBackingStore. Exiting without cleanly " + + "shutting down pending tasks and/or callbacks."); + } + executor = null; + } } @Override
