(TWILL-133) Make the ZKClient stoppable even it is still trying to connect

- Also include refactoring of the DefaultZKClient class to simplify
  shutdown logic
- Fix a thread leaking bug in LeaderElection

This closes #41 on GitHub

Signed-off-by: Terence Yim <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/2054c5f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/2054c5f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/2054c5f9

Branch: refs/heads/site
Commit: 2054c5f9364effabfa3008a3a9043b08b1d1b64c
Parents: ca33245
Author: Terence Yim <[email protected]>
Authored: Fri Jun 12 17:13:05 2015 -0700
Committer: Terence Yim <[email protected]>
Committed: Mon Jun 15 19:20:41 2015 -0700

----------------------------------------------------------------------
 .../zookeeper/DefaultZKClientService.java       | 164 +++++++++++++------
 .../internal/zookeeper/LeaderElection.java      |  12 +-
 .../apache/twill/zookeeper/ZKClientTest.java    |  85 +++++++---
 3 files changed, 185 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2054c5f9/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git 
a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
 
b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 7b9b345..dc2bfa9 100644
--- 
a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ 
b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -58,7 +58,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 
@@ -91,11 +90,11 @@ public final class DefaultZKClientService extends 
AbstractZKClient implements ZK
                                 Watcher connectionWatcher, Multimap<String, 
byte[]> authInfos) {
     this.zkStr = zkStr;
     this.sessionTimeout = sessionTimeout;
-    this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
+    this.connectionWatchers = new CopyOnWriteArrayList<>();
     this.authInfos = copyAuthInfo(authInfos);
     addConnectionWatcher(connectionWatcher);
 
-    this.zooKeeper = new AtomicReference<ZooKeeper>();
+    this.zooKeeper = new AtomicReference<>();
     serviceDelegate = new ServiceDelegate();
   }
 
@@ -111,7 +110,7 @@ public final class DefaultZKClientService extends 
AbstractZKClient implements ZK
   }
 
   @Override
-  public Cancellable addConnectionWatcher(Watcher watcher) {
+  public Cancellable addConnectionWatcher(final Watcher watcher) {
     if (watcher == null) {
       return new Cancellable() {
         @Override
@@ -121,12 +120,13 @@ public final class DefaultZKClientService extends 
AbstractZKClient implements ZK
       };
     }
 
-    final Watcher wrappedWatcher = wrapWatcher(watcher);
-    connectionWatchers.add(wrappedWatcher);
+    // Invocation of connection watchers are already done inside the event 
thread,
+    // hence no need to wrap the watcher again.
+    connectionWatchers.add(watcher);
     return new Cancellable() {
       @Override
       public void cancel() {
-        connectionWatchers.remove(wrappedWatcher);
+        connectionWatchers.remove(watcher);
       }
     };
   }
@@ -391,25 +391,57 @@ public final class DefaultZKClientService extends 
AbstractZKClient implements ZK
 
   private final class ServiceDelegate extends AbstractService implements 
Watcher {
 
-    private final AtomicBoolean stopNotified = new AtomicBoolean(false);
-    private volatile boolean executorStopped;
+    private final Runnable stopTask;
 
-    @Override
-    protected void doStart() {
-      // A single thread executor
-      eventExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, 
new LinkedBlockingQueue<Runnable>(),
-                                             
Threads.createDaemonThreadFactory("zk-client-EventThread")) {
+    private ServiceDelegate() {
+      // Creates the stop task runnable in constructor so that if the stop() 
method is called from shutdown hook,
+      // it won't fail with class loading error due to failure to load inner 
class from the shutdown thread.
+      this.stopTask = createStopTask();
+
+      // Add a listener for state changes so that we can terminate the service 
even it is in STARTING state upoon
+      // stop is requested
+      addListener(new Listener() {
         @Override
-        protected void terminated() {
-          super.terminated();
+        public void starting() {
+          // no-op
+        }
 
-          // Only call notifyStopped if the executor.shutdown() returned, 
otherwise deadlock (TWILL-110) can occur.
-          // Also, notifyStopped() should only be called once.
-          if (executorStopped && stopNotified.compareAndSet(false, true)) {
-            notifyStopped();
+        @Override
+        public void running() {
+          // no-op
+        }
+
+        @Override
+        public void stopping(State from) {
+          if (from == State.STARTING) {
+            // If it is still starting, just notify that it's started to 
transit out of the STARTING phase.
+            notifyStarted();
           }
         }
-      };
+
+        @Override
+        public void terminated(State from) {
+          // no-op
+        }
+
+        @Override
+        public void failed(State from, Throwable failure) {
+          eventExecutor.shutdownNow();
+          // Close the ZK client if there is exception. It is needed because 
the stop task may not get executed
+          closeZooKeeper(zooKeeper.getAndSet(null));
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
+    }
+
+    @Override
+    protected void doStart() {
+      // A single thread executor for all events
+      ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, 
TimeUnit.MILLISECONDS,
+                                                           new 
LinkedBlockingQueue<Runnable>(),
+                                                           
Threads.createDaemonThreadFactory("zk-client-EventThread"));
+      // Just discard the execution if the executor is closed
+      executor.setRejectedExecutionHandler(new 
ThreadPoolExecutor.DiscardPolicy());
+      eventExecutor = executor;
 
       try {
         zooKeeper.set(createZooKeeper());
@@ -420,29 +452,21 @@ public final class DefaultZKClientService extends 
AbstractZKClient implements ZK
 
     @Override
     protected void doStop() {
-      ZooKeeper zk = zooKeeper.getAndSet(null);
-      if (zk != null) {
-        try {
-          zk.close();
-        } catch (InterruptedException e) {
-          notifyFailed(e);
-        } finally {
-          eventExecutor.shutdown();
-          executorStopped = true;
-
-          // If the executor state is terminated, meaning the terminate() 
method is triggered,
-          // call notifyStopped() if it hasn't been called yet.
-          if (eventExecutor.isTerminated() && 
stopNotified.compareAndSet(false, true)) {
-            notifyStopped();
-          }
-        }
-      }
+      // Submit a task to the executor to make sure all pending events in the 
executor are fired before
+      // transiting this Service into STOPPED state
+      eventExecutor.submit(stopTask);
+      eventExecutor.shutdown();
     }
 
     @Override
     public void process(WatchedEvent event) {
+      State state = state();
+      if (state == State.TERMINATED || state == State.FAILED) {
+        return;
+      }
+
       try {
-        if (event.getState() == Event.KeeperState.SyncConnected && state() == 
State.STARTING) {
+        if (event.getState() == Event.KeeperState.SyncConnected && state == 
State.STARTING) {
           LOG.debug("Connected to ZooKeeper: {}", zkStr);
           notifyStarted();
           return;
@@ -451,23 +475,27 @@ public final class DefaultZKClientService extends 
AbstractZKClient implements ZK
           LOG.info("ZooKeeper session expired: {}", zkStr);
 
           // When connection expired, simply reconnect again
-          Thread t = new Thread(new Runnable() {
+          if (state != State.RUNNING) {
+            return;
+          }
+          eventExecutor.submit(new Runnable() {
             @Override
             public void run() {
+              // Only reconnect if the current state is running
+              if (state() != State.RUNNING) {
+                return;
+              }
               try {
                 LOG.info("Reconnect to ZooKeeper due to expiration: {}", 
zkStr);
-                zooKeeper.set(createZooKeeper());
+                closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
               } catch (IOException e) {
-                zooKeeper.set(null);
                 notifyFailed(e);
               }
             }
-          }, "zk-reconnect");
-          t.setDaemon(true);
-          t.start();
+          });
         }
       } finally {
-        if (event.getType() == Event.EventType.None && 
!connectionWatchers.isEmpty()) {
+        if (event.getType() == Event.EventType.None) {
           for (Watcher connectionWatcher : connectionWatchers) {
             connectionWatcher.process(event);
           }
@@ -475,16 +503,58 @@ public final class DefaultZKClientService extends 
AbstractZKClient implements ZK
       }
     }
 
+
+    /**
+     * Creates a {@link Runnable} task that will get executed in the event 
executor for transiting this
+     * Service into STOPPED state.
+     */
+    private Runnable createStopTask() {
+      return new Runnable() {
+        @Override
+        public void run() {
+          try {
+            // Close the ZK connection in this task will make sure if there is 
ZK connection created
+            // after doStop() was called but before this task has been 
executed is also closed.
+            // It is possible to happen when the following sequence happens:
+            //
+            // 1. session expired, hence the expired event is triggered
+            // 2. The reconnect task executed. With Service.state() == 
RUNNING, it creates a new ZK client
+            // 3. Service.stop() gets called, Service.state() changed to 
STOPPING
+            // 4. The new ZK client created from the reconnect thread update 
the zooKeeper with the new one
+            closeZooKeeper(zooKeeper.getAndSet(null));
+            notifyStopped();
+          } catch (Exception e) {
+            notifyFailed(e);
+          }
+        }
+      };
+    }
+
     /**
      * Creates a new ZooKeeper connection.
      */
     private ZooKeeper createZooKeeper() throws IOException {
-      ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, this);
+      ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this));
       for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
         zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
       }
       return zk;
     }
+
+    /**
+     * Closes the given {@link ZooKeeper} if it is not null. If there is 
InterruptedException,
+     * it will get logged.
+     */
+    private void closeZooKeeper(@Nullable ZooKeeper zk) {
+      try {
+        if (zk != null) {
+          zk.close();
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted when closing ZooKeeper", e);
+        Thread.currentThread().interrupt();
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2054c5f9/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
----------------------------------------------------------------------
diff --git 
a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
 
b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
index 837a5ae..8433b18 100644
--- 
a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
+++ 
b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
@@ -101,12 +101,20 @@ public final class LeaderElection extends AbstractService 
{
     Futures.addCallback(completion, new FutureCallback<String>() {
       @Override
       public void onSuccess(String result) {
-        notifyStopped();
+        try {
+          notifyStopped();
+        } finally {
+          executor.shutdown();
+        }
       }
 
       @Override
       public void onFailure(Throwable t) {
-        notifyFailed(t);
+        try {
+          notifyFailed(t);
+        } finally {
+          executor.shutdown();
+        }
       }
     }, Threads.SAME_THREAD_EXECUTOR);
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2054c5f9/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git 
a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java 
b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index 97dec03..162d4db 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.ServerSocket;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.List;
@@ -235,38 +236,40 @@ public class ZKClientTest {
         }
       }
     }).build(), RetryStrategies.fixDelay(0, TimeUnit.SECONDS)));
-    client.startAndWait();
-
-    zkServer.stopAndWait();
-
-    Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
 
     final CountDownLatch createLatch = new CountDownLatch(1);
-    Futures.addCallback(client.create("/testretry/test", null, 
CreateMode.PERSISTENT), new FutureCallback<String>() {
-      @Override
-      public void onSuccess(String result) {
-        createLatch.countDown();
-      }
+    client.startAndWait();
+    try {
+      zkServer.stopAndWait();
 
-      @Override
-      public void onFailure(Throwable t) {
-        t.printStackTrace(System.out);
-      }
-    });
-
-    TimeUnit.SECONDS.sleep(2);
-    zkServer = InMemoryZKServer.builder()
-                               .setDataDir(dataDir)
-                               .setAutoCleanDataDir(true)
-                               .setPort(port)
-                               .setTickTime(1000)
-                               .build();
-    zkServer.startAndWait();
+      Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
+      Futures.addCallback(client.create("/testretry/test", null, 
CreateMode.PERSISTENT), new FutureCallback<String>() {
+        @Override
+        public void onSuccess(String result) {
+          createLatch.countDown();
+        }
 
-    try {
-      Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS));
+        @Override
+        public void onFailure(Throwable t) {
+          t.printStackTrace(System.out);
+        }
+      });
+
+      TimeUnit.SECONDS.sleep(2);
+      zkServer = InMemoryZKServer.builder()
+                                 .setDataDir(dataDir)
+                                 .setAutoCleanDataDir(true)
+                                 .setPort(port)
+                                 .setTickTime(1000)
+                                 .build();
+      zkServer.startAndWait();
+      try {
+        Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS));
+      } finally {
+        zkServer.stopAndWait();
+      }
     } finally {
-      zkServer.stopAndWait();
+      client.stopAndWait();
     }
   }
 
@@ -353,4 +356,32 @@ public class ZKClientTest {
       zkServer.stopAndWait();
     }
   }
+
+  @Test
+  public void testStop() throws IOException, InterruptedException, 
ExecutionException {
+    try (final ServerSocket serverSocket = new ServerSocket(0)) {
+      // A latch to make sure at least one connection attempt from the zk 
client has been made
+      final CountDownLatch connectLatch = new CountDownLatch(1);
+      Thread serverThread = new Thread() {
+        public void run() {
+          try {
+            while (!interrupted()) {
+              serverSocket.accept().close();
+              connectLatch.countDown();
+            }
+          } catch (Exception e) {
+            // no-op
+          }
+        }
+      };
+      serverThread.start();
+
+      ZKClientService zkClient = ZKClientService.Builder.of("localhost:" + 
serverSocket.getLocalPort()).build();
+      zkClient.start();
+      Assert.assertTrue(connectLatch.await(10, TimeUnit.SECONDS));
+
+      zkClient.stopAndWait();
+      serverThread.interrupt();
+    }
+  }
 }

Reply via email to