Repository: curator Updated Branches: refs/heads/CURATOR-92 [created] 2580ef3f5
Added CloseMode support to LeaderLatch in order to be able to trigger the notLeader() callback when a Latch is manually closed. This closes #1 Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2580ef3f Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2580ef3f Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2580ef3f Branch: refs/heads/CURATOR-92 Commit: 2580ef3f5df148a67fe5d2769e4d04890e8b4fd6 Parents: 0d9eaed Author: David Trott <git...@davidtrott.com> Authored: Wed Mar 5 16:57:04 2014 -0800 Committer: randgalt <randg...@apache.org> Committed: Wed Mar 5 21:53:16 2014 -0500 ---------------------------------------------------------------------- .../framework/recipes/leader/LeaderLatch.java | 50 +++++- .../recipes/leader/TestLeaderLatch.java | 171 +++++++++++++++++++ 2 files changed, 218 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/2580ef3f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 8d9a11f..310919b 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -67,6 +67,7 @@ public class LeaderLatch implements Closeable private final AtomicBoolean hasLeadership = new AtomicBoolean(false); private final AtomicReference<String> ourPath = new AtomicReference<String>(); private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>(); + private final CloseMode closeMode; private final ConnectionStateListener listener = new ConnectionStateListener() { @@ -95,13 +96,19 @@ public class LeaderLatch implements Closeable CLOSED } + public enum CloseMode + { + SILENT, + NOTIFY_LEADER + } + /** * @param client the client * @param latchPath the path for this leadership group */ public LeaderLatch(CuratorFramework client, String latchPath) { - this(client, latchPath, ""); + this(client, latchPath, "", CloseMode.SILENT); } /** @@ -111,9 +118,21 @@ public class LeaderLatch implements Closeable */ public LeaderLatch(CuratorFramework client, String latchPath, String id) { + this(client, latchPath, id, CloseMode.SILENT); + } + + /** + * @param client the client + * @param latchPath the path for this leadership group + * @param id participant ID + * @param closeMode behaviour of listener on explicit close. + */ + public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode) + { this.client = Preconditions.checkNotNull(client, "client cannot be null"); this.latchPath = Preconditions.checkNotNull(latchPath, "mutexPath cannot be null"); this.id = Preconditions.checkNotNull(id, "id cannot be null"); + this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); } /** @@ -139,7 +158,22 @@ public class LeaderLatch implements Closeable @Override public void close() throws IOException { + close(this.closeMode); + } + + /** + * Remove this instance from the leadership election. If this instance is the leader, leadership + * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch + * instances must eventually be closed. + * + * @param closeMode allows the default close mode to be overridden at the time the latch is closed. + * + * @throws IOException errors + */ + public void close(CloseMode closeMode) throws IOException + { Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); + Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); try { @@ -152,8 +186,18 @@ public class LeaderLatch implements Closeable finally { client.getConnectionStateListenable().removeListener(listener); - listeners.clear(); - setLeadership(false); + + switch(closeMode) + { + case NOTIFY_LEADER: + setLeadership(false); + listeners.clear(); + break; + default: + listeners.clear(); + setLeadership(false); + break; + } } } http://git-wip-us.apache.org/repos/asf/curator/blob/2580ef3f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index f4b5590..067c817 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -365,6 +365,177 @@ public class TestLeaderLatch extends BaseClassForTests } } + @Test + public void testCallbackNotifyLeader() throws Exception + { + final int PARTICIPANT_QTY = 10; + final int SILENT_QTY = 3; + + final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY); + final AtomicLong masterCounter = new AtomicLong(0); + final AtomicLong dunceCounter = new AtomicLong(0); + + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build()); + + List<LeaderLatch> latches = Lists.newArrayList(); + for ( int i = 0; i < PARTICIPANT_QTY; ++i ) + { + LeaderLatch.CloseMode closeMode = i < SILENT_QTY ? + LeaderLatch.CloseMode.SILENT : + LeaderLatch.CloseMode.NOTIFY_LEADER; + + final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", closeMode); + latch.addListener( + new LeaderLatchListener() + { + boolean beenLeader = false; + + @Override + public void isLeader() + { + if ( !beenLeader ) + { + masterCounter.incrementAndGet(); + beenLeader = true; + try + { + latch.reset(); + } + catch ( Exception e ) + { + throw Throwables.propagate(e); + } + } + else + { + masterCounter.incrementAndGet(); + CloseableUtils.closeQuietly(latch); + timesSquare.countDown(); + } + } + + @Override + public void notLeader() + { + dunceCounter.incrementAndGet(); + } + }, + exec + ); + latches.add(latch); + } + + try + { + client.start(); + + for ( LeaderLatch latch : latches ) + { + latch.start(); + } + + timesSquare.await(); + + Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2); + Assert.assertEquals(dunceCounter.get(), PARTICIPANT_QTY * 2 - SILENT_QTY); + for ( LeaderLatch latch : latches ) + { + Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED); + } + } + finally + { + for ( LeaderLatch latch : latches ) + { + if ( latch.getState() != LeaderLatch.State.CLOSED ) + { + CloseableUtils.closeQuietly(latch); + } + } + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testCallbackDontNotifyDunce() throws Exception { + final AtomicLong masterCounter = new AtomicLong(0); + final AtomicLong dunceCounter = new AtomicLong(0); + + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + + final LeaderLatch leader = new LeaderLatch(client, PATH_NAME); + final LeaderLatch dunce = new LeaderLatch(client, PATH_NAME, "", LeaderLatch.CloseMode.NOTIFY_LEADER); + + leader.addListener(new LeaderLatchListener() + { + @Override + public void isLeader() + { + } + + @Override + public void notLeader() + { + masterCounter.incrementAndGet(); + } + }); + + dunce.addListener(new LeaderLatchListener() + { + @Override + public void isLeader() + { + } + + @Override + public void notLeader() + { + dunceCounter.incrementAndGet(); + } + }); + + try + { + client.start(); + + leader.start(); + + timing.sleepABit(); + + dunce.start(); + + timing.sleepABit(); + + dunce.close(); + + timing.sleepABit(); + + // Test the close override + leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER); + + Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED); + Assert.assertEquals(dunce.getState(), LeaderLatch.State.CLOSED); + + Assert.assertEquals(masterCounter.get(), 1); + Assert.assertEquals(dunceCounter.get(), 0); + } + finally + { + if (leader.getState() != LeaderLatch.State.CLOSED) + { + CloseableUtils.closeQuietly(leader); + } + if (dunce.getState() != LeaderLatch.State.CLOSED) + { + CloseableUtils.closeQuietly(dunce); + } + CloseableUtils.closeQuietly(client); + } + } + private enum Mode { START_IMMEDIATELY,