(TWILL-133) Make the ZKClient stoppable even it is still trying to connect - Also include refactoring of the DefaultZKClient class to simplify shutdown logic - Fix a thread leaking bug in LeaderElection
This closes #41 on GitHub Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/2054c5f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/2054c5f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/2054c5f9 Branch: refs/heads/site Commit: 2054c5f9364effabfa3008a3a9043b08b1d1b64c Parents: ca33245 Author: Terence Yim <[email protected]> Authored: Fri Jun 12 17:13:05 2015 -0700 Committer: Terence Yim <[email protected]> Committed: Mon Jun 15 19:20:41 2015 -0700 ---------------------------------------------------------------------- .../zookeeper/DefaultZKClientService.java | 164 +++++++++++++------ .../internal/zookeeper/LeaderElection.java | 12 +- .../apache/twill/zookeeper/ZKClientTest.java | 85 +++++++--- 3 files changed, 185 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2054c5f9/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java ---------------------------------------------------------------------- diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java index 7b9b345..dc2bfa9 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java @@ -58,7 +58,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -91,11 +90,11 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK Watcher connectionWatcher, Multimap<String, byte[]> authInfos) { this.zkStr = zkStr; this.sessionTimeout = sessionTimeout; - this.connectionWatchers = new CopyOnWriteArrayList<Watcher>(); + this.connectionWatchers = new CopyOnWriteArrayList<>(); this.authInfos = copyAuthInfo(authInfos); addConnectionWatcher(connectionWatcher); - this.zooKeeper = new AtomicReference<ZooKeeper>(); + this.zooKeeper = new AtomicReference<>(); serviceDelegate = new ServiceDelegate(); } @@ -111,7 +110,7 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK } @Override - public Cancellable addConnectionWatcher(Watcher watcher) { + public Cancellable addConnectionWatcher(final Watcher watcher) { if (watcher == null) { return new Cancellable() { @Override @@ -121,12 +120,13 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK }; } - final Watcher wrappedWatcher = wrapWatcher(watcher); - connectionWatchers.add(wrappedWatcher); + // Invocation of connection watchers are already done inside the event thread, + // hence no need to wrap the watcher again. + connectionWatchers.add(watcher); return new Cancellable() { @Override public void cancel() { - connectionWatchers.remove(wrappedWatcher); + connectionWatchers.remove(watcher); } }; } @@ -391,25 +391,57 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK private final class ServiceDelegate extends AbstractService implements Watcher { - private final AtomicBoolean stopNotified = new AtomicBoolean(false); - private volatile boolean executorStopped; + private final Runnable stopTask; - @Override - protected void doStart() { - // A single thread executor - eventExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), - Threads.createDaemonThreadFactory("zk-client-EventThread")) { + private ServiceDelegate() { + // Creates the stop task runnable in constructor so that if the stop() method is called from shutdown hook, + // it won't fail with class loading error due to failure to load inner class from the shutdown thread. + this.stopTask = createStopTask(); + + // Add a listener for state changes so that we can terminate the service even it is in STARTING state upoon + // stop is requested + addListener(new Listener() { @Override - protected void terminated() { - super.terminated(); + public void starting() { + // no-op + } - // Only call notifyStopped if the executor.shutdown() returned, otherwise deadlock (TWILL-110) can occur. - // Also, notifyStopped() should only be called once. - if (executorStopped && stopNotified.compareAndSet(false, true)) { - notifyStopped(); + @Override + public void running() { + // no-op + } + + @Override + public void stopping(State from) { + if (from == State.STARTING) { + // If it is still starting, just notify that it's started to transit out of the STARTING phase. + notifyStarted(); } } - }; + + @Override + public void terminated(State from) { + // no-op + } + + @Override + public void failed(State from, Throwable failure) { + eventExecutor.shutdownNow(); + // Close the ZK client if there is exception. It is needed because the stop task may not get executed + closeZooKeeper(zooKeeper.getAndSet(null)); + } + }, Threads.SAME_THREAD_EXECUTOR); + } + + @Override + protected void doStart() { + // A single thread executor for all events + ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + Threads.createDaemonThreadFactory("zk-client-EventThread")); + // Just discard the execution if the executor is closed + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + eventExecutor = executor; try { zooKeeper.set(createZooKeeper()); @@ -420,29 +452,21 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK @Override protected void doStop() { - ZooKeeper zk = zooKeeper.getAndSet(null); - if (zk != null) { - try { - zk.close(); - } catch (InterruptedException e) { - notifyFailed(e); - } finally { - eventExecutor.shutdown(); - executorStopped = true; - - // If the executor state is terminated, meaning the terminate() method is triggered, - // call notifyStopped() if it hasn't been called yet. - if (eventExecutor.isTerminated() && stopNotified.compareAndSet(false, true)) { - notifyStopped(); - } - } - } + // Submit a task to the executor to make sure all pending events in the executor are fired before + // transiting this Service into STOPPED state + eventExecutor.submit(stopTask); + eventExecutor.shutdown(); } @Override public void process(WatchedEvent event) { + State state = state(); + if (state == State.TERMINATED || state == State.FAILED) { + return; + } + try { - if (event.getState() == Event.KeeperState.SyncConnected && state() == State.STARTING) { + if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) { LOG.debug("Connected to ZooKeeper: {}", zkStr); notifyStarted(); return; @@ -451,23 +475,27 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK LOG.info("ZooKeeper session expired: {}", zkStr); // When connection expired, simply reconnect again - Thread t = new Thread(new Runnable() { + if (state != State.RUNNING) { + return; + } + eventExecutor.submit(new Runnable() { @Override public void run() { + // Only reconnect if the current state is running + if (state() != State.RUNNING) { + return; + } try { LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr); - zooKeeper.set(createZooKeeper()); + closeZooKeeper(zooKeeper.getAndSet(createZooKeeper())); } catch (IOException e) { - zooKeeper.set(null); notifyFailed(e); } } - }, "zk-reconnect"); - t.setDaemon(true); - t.start(); + }); } } finally { - if (event.getType() == Event.EventType.None && !connectionWatchers.isEmpty()) { + if (event.getType() == Event.EventType.None) { for (Watcher connectionWatcher : connectionWatchers) { connectionWatcher.process(event); } @@ -475,16 +503,58 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK } } + + /** + * Creates a {@link Runnable} task that will get executed in the event executor for transiting this + * Service into STOPPED state. + */ + private Runnable createStopTask() { + return new Runnable() { + @Override + public void run() { + try { + // Close the ZK connection in this task will make sure if there is ZK connection created + // after doStop() was called but before this task has been executed is also closed. + // It is possible to happen when the following sequence happens: + // + // 1. session expired, hence the expired event is triggered + // 2. The reconnect task executed. With Service.state() == RUNNING, it creates a new ZK client + // 3. Service.stop() gets called, Service.state() changed to STOPPING + // 4. The new ZK client created from the reconnect thread update the zooKeeper with the new one + closeZooKeeper(zooKeeper.getAndSet(null)); + notifyStopped(); + } catch (Exception e) { + notifyFailed(e); + } + } + }; + } + /** * Creates a new ZooKeeper connection. */ private ZooKeeper createZooKeeper() throws IOException { - ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, this); + ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this)); for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) { zk.addAuthInfo(authInfo.getKey(), authInfo.getValue()); } return zk; } + + /** + * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException, + * it will get logged. + */ + private void closeZooKeeper(@Nullable ZooKeeper zk) { + try { + if (zk != null) { + zk.close(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted when closing ZooKeeper", e); + Thread.currentThread().interrupt(); + } + } } /** http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2054c5f9/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java ---------------------------------------------------------------------- diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java index 837a5ae..8433b18 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java @@ -101,12 +101,20 @@ public final class LeaderElection extends AbstractService { Futures.addCallback(completion, new FutureCallback<String>() { @Override public void onSuccess(String result) { - notifyStopped(); + try { + notifyStopped(); + } finally { + executor.shutdown(); + } } @Override public void onFailure(Throwable t) { - notifyFailed(t); + try { + notifyFailed(t); + } finally { + executor.shutdown(); + } } }, Threads.SAME_THREAD_EXECUTOR); http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2054c5f9/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java ---------------------------------------------------------------------- diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java index 97dec03..162d4db 100644 --- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java +++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.net.ServerSocket; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.List; @@ -235,38 +236,40 @@ public class ZKClientTest { } } }).build(), RetryStrategies.fixDelay(0, TimeUnit.SECONDS))); - client.startAndWait(); - - zkServer.stopAndWait(); - - Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS)); final CountDownLatch createLatch = new CountDownLatch(1); - Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() { - @Override - public void onSuccess(String result) { - createLatch.countDown(); - } + client.startAndWait(); + try { + zkServer.stopAndWait(); - @Override - public void onFailure(Throwable t) { - t.printStackTrace(System.out); - } - }); - - TimeUnit.SECONDS.sleep(2); - zkServer = InMemoryZKServer.builder() - .setDataDir(dataDir) - .setAutoCleanDataDir(true) - .setPort(port) - .setTickTime(1000) - .build(); - zkServer.startAndWait(); + Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS)); + Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() { + @Override + public void onSuccess(String result) { + createLatch.countDown(); + } - try { - Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS)); + @Override + public void onFailure(Throwable t) { + t.printStackTrace(System.out); + } + }); + + TimeUnit.SECONDS.sleep(2); + zkServer = InMemoryZKServer.builder() + .setDataDir(dataDir) + .setAutoCleanDataDir(true) + .setPort(port) + .setTickTime(1000) + .build(); + zkServer.startAndWait(); + try { + Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS)); + } finally { + zkServer.stopAndWait(); + } } finally { - zkServer.stopAndWait(); + client.stopAndWait(); } } @@ -353,4 +356,32 @@ public class ZKClientTest { zkServer.stopAndWait(); } } + + @Test + public void testStop() throws IOException, InterruptedException, ExecutionException { + try (final ServerSocket serverSocket = new ServerSocket(0)) { + // A latch to make sure at least one connection attempt from the zk client has been made + final CountDownLatch connectLatch = new CountDownLatch(1); + Thread serverThread = new Thread() { + public void run() { + try { + while (!interrupted()) { + serverSocket.accept().close(); + connectLatch.countDown(); + } + } catch (Exception e) { + // no-op + } + } + }; + serverThread.start(); + + ZKClientService zkClient = ZKClientService.Builder.of("localhost:" + serverSocket.getLocalPort()).build(); + zkClient.start(); + Assert.assertTrue(connectLatch.await(10, TimeUnit.SECONDS)); + + zkClient.stopAndWait(); + serverThread.interrupt(); + } + } }
