AfterConnectionEstablished needs to return the future from the service so that clients can cancel the action if needed. Added this to LeaderLatch
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5954e66f Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5954e66f Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5954e66f Branch: refs/heads/CURATOR-110 Commit: 5954e66fa3108c39b3f2915583def5e51915846f Parents: fab79a4 Author: randgalt <randg...@apache.org> Authored: Tue Jun 17 14:38:14 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Tue Jun 17 14:38:14 2014 -0500 ---------------------------------------------------------------------- .../recipes/AfterConnectionEstablished.java | 7 ++- .../framework/recipes/leader/LeaderLatch.java | 51 ++++++++++++++------ .../recipes/leader/TestLeaderLatch.java | 2 + 3 files changed, 44 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java index 41ba702..65c6ace 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.recipes; import org.apache.curator.framework.CuratorFramework; @@ -23,6 +24,7 @@ import org.apache.curator.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; /** * Utility class to allow execution of logic once a ZooKeeper connection becomes available. @@ -37,8 +39,9 @@ public class AfterConnectionEstablished * * @param client The curator client * @param runAfterConnection The logic to run + * @return future of the task so it can be canceled, etc. if needed */ - public static void execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception + public static Future<?> execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception { //Block until connected final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass())); @@ -62,7 +65,7 @@ public class AfterConnectionEstablished } } }; - executor.submit(internalCall); + return executor.submit(internalCall); } private AfterConnectionEstablished() http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 9d70645..6f7636a 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -46,6 +46,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -69,6 +70,7 @@ public class LeaderLatch implements Closeable private final AtomicReference<String> ourPath = new AtomicReference<String>(); private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>(); private final CloseMode closeMode; + private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>(); private final ConnectionStateListener listener = new ConnectionStateListener() { @@ -155,22 +157,21 @@ public class LeaderLatch implements Closeable { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); - AfterConnectionEstablished.execute(client, new Runnable() - { - @Override - public void run() + startTask.set(AfterConnectionEstablished.execute(client, new Runnable() { - client.getConnectionStateListenable().addListener(listener); - try - { - reset(); - } - catch ( Exception e ) + @Override + public void run() { - log.error("An error occurred checking resetting leadership.", e); + try + { + internalStart(); + } + finally + { + startTask.set(null); + } } - } - }); + })); } /** @@ -194,11 +195,17 @@ public class LeaderLatch implements Closeable * @param closeMode allows the default close mode to be overridden at the time the latch is closed. * @throws IOException errors */ - public void close(CloseMode closeMode) throws IOException + public synchronized void close(CloseMode closeMode) throws IOException { Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); + Future<?> localStartTask = startTask.getAndSet(null); + if ( localStartTask != null ) + { + localStartTask.cancel(true); + } + try { setNode(null); @@ -485,6 +492,22 @@ public class LeaderLatch implements Closeable client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); } + private synchronized void internalStart() + { + if ( state.get() == State.STARTED ) + { + client.getConnectionStateListenable().addListener(listener); + try + { + reset(); + } + catch ( Exception e ) + { + log.error("An error occurred checking resetting leadership.", e); + } + } + } + private void checkLeadership(List<String> children) throws Exception { final String localOurPath = ourPath.get(); http://git-wip-us.apache.org/repos/asf/curator/blob/5954e66f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index b97e708..f4fb1c7 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -89,6 +89,8 @@ public class TestLeaderLatch extends BaseClassForTests try { client.start(); + client.create().creatingParentsIfNeeded().forPath(PATH_NAME); + LeaderLatch latch = new LeaderLatch(client, PATH_NAME); latch.debugResetWaitLatch = new CountDownLatch(1);