AfterConnectionEstablished needs to return the future from the service so that 
clients can cancel the action if needed. Added this to LeaderLatch


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5954e66f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5954e66f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5954e66f

Branch: refs/heads/CURATOR-110
Commit: 5954e66fa3108c39b3f2915583def5e51915846f
Parents: fab79a4
Author: randgalt <randg...@apache.org>
Authored: Tue Jun 17 14:38:14 2014 -0500
Committer: randgalt <randg...@apache.org>
Committed: Tue Jun 17 14:38:14 2014 -0500

----------------------------------------------------------------------
 .../recipes/AfterConnectionEstablished.java     |  7 ++-
 .../framework/recipes/leader/LeaderLatch.java   | 51 ++++++++++++++------
 .../recipes/leader/TestLeaderLatch.java         |  2 +
 3 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
index 41ba702..65c6ace 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -23,6 +24,7 @@ import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 /**
  * Utility class to allow execution of logic once a ZooKeeper connection 
becomes available.
@@ -37,8 +39,9 @@ public class AfterConnectionEstablished
      *
      * @param client             The curator client
      * @param runAfterConnection The logic to run
+     * @return future of the task so it can be canceled, etc. if needed
      */
-    public static void execute(final CuratorFramework client, final Runnable 
runAfterConnection) throws Exception
+    public static Future<?> execute(final CuratorFramework client, final 
Runnable runAfterConnection) throws Exception
     {
         //Block until connected
         final ExecutorService executor = 
ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
@@ -62,7 +65,7 @@ public class AfterConnectionEstablished
                 }
             }
         };
-        executor.submit(internalCall);
+        return executor.submit(internalCall);
     }
 
     private AfterConnectionEstablished()

http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 9d70645..6f7636a 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -46,6 +46,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -69,6 +70,7 @@ public class LeaderLatch implements Closeable
     private final AtomicReference<String> ourPath = new 
AtomicReference<String>();
     private final ListenerContainer<LeaderLatchListener> listeners = new 
ListenerContainer<LeaderLatchListener>();
     private final CloseMode closeMode;
+    private final AtomicReference<Future<?>> startTask = new 
AtomicReference<Future<?>>();
 
     private final ConnectionStateListener listener = new 
ConnectionStateListener()
     {
@@ -155,22 +157,21 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Cannot be started more than once");
 
-        AfterConnectionEstablished.execute(client, new Runnable()
-            {
-                @Override
-                public void run()
+        startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
                 {
-                    
client.getConnectionStateListenable().addListener(listener);
-                    try
-                    {
-                        reset();
-                    }
-                    catch ( Exception e )
+                    @Override
+                    public void run()
                     {
-                        log.error("An error occurred checking resetting 
leadership.", e);
+                        try
+                        {
+                            internalStart();
+                        }
+                        finally
+                        {
+                            startTask.set(null);
+                        }
                     }
-                }
-            });
+                }));
     }
 
     /**
@@ -194,11 +195,17 @@ public class LeaderLatch implements Closeable
      * @param closeMode allows the default close mode to be overridden at the 
time the latch is closed.
      * @throws IOException errors
      */
-    public void close(CloseMode closeMode) throws IOException
+    public synchronized void close(CloseMode closeMode) throws IOException
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, 
State.CLOSED), "Already closed or has not been started");
         Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
 
+        Future<?> localStartTask = startTask.getAndSet(null);
+        if ( localStartTask != null )
+        {
+            localStartTask.cancel(true);
+        }
+
         try
         {
             setNode(null);
@@ -485,6 +492,22 @@ public class LeaderLatch implements Closeable
         
client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath,
 LOCK_NAME), LeaderSelector.getIdBytes(id));
     }
 
+    private synchronized void internalStart()
+    {
+        if ( state.get() == State.STARTED )
+        {
+            client.getConnectionStateListenable().addListener(listener);
+            try
+            {
+                reset();
+            }
+            catch ( Exception e )
+            {
+                log.error("An error occurred checking resetting leadership.", 
e);
+            }
+        }
+    }
+
     private void checkLeadership(List<String> children) throws Exception
     {
         final String localOurPath = ourPath.get();

http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index b97e708..f4fb1c7 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -89,6 +89,8 @@ public class TestLeaderLatch extends BaseClassForTests
         try
         {
             client.start();
+            client.create().creatingParentsIfNeeded().forPath(PATH_NAME);
+
             LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
 
             latch.debugResetWaitLatch = new CountDownLatch(1);

Reply via email to