Repository: curator
Updated Branches:
  refs/heads/CURATOR-92 [created] 2580ef3f5


Added CloseMode support to LeaderLatch in order to be able to trigger the 
notLeader() callback when a Latch is manually closed.

This closes #1


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2580ef3f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2580ef3f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2580ef3f

Branch: refs/heads/CURATOR-92
Commit: 2580ef3f5df148a67fe5d2769e4d04890e8b4fd6
Parents: 0d9eaed
Author: David Trott <git...@davidtrott.com>
Authored: Wed Mar 5 16:57:04 2014 -0800
Committer: randgalt <randg...@apache.org>
Committed: Wed Mar 5 21:53:16 2014 -0500

----------------------------------------------------------------------
 .../framework/recipes/leader/LeaderLatch.java   |  50 +++++-
 .../recipes/leader/TestLeaderLatch.java         | 171 +++++++++++++++++++
 2 files changed, 218 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/2580ef3f/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 8d9a11f..310919b 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -67,6 +67,7 @@ public class LeaderLatch implements Closeable
     private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
     private final AtomicReference<String> ourPath = new 
AtomicReference<String>();
     private final ListenerContainer<LeaderLatchListener> listeners = new 
ListenerContainer<LeaderLatchListener>();
+    private final CloseMode closeMode;
 
     private final ConnectionStateListener listener = new 
ConnectionStateListener()
     {
@@ -95,13 +96,19 @@ public class LeaderLatch implements Closeable
         CLOSED
     }
 
+    public enum CloseMode
+    {
+        SILENT,
+        NOTIFY_LEADER
+    }
+
     /**
      * @param client    the client
      * @param latchPath the path for this leadership group
      */
     public LeaderLatch(CuratorFramework client, String latchPath)
     {
-        this(client, latchPath, "");
+        this(client, latchPath, "", CloseMode.SILENT);
     }
 
     /**
@@ -111,9 +118,21 @@ public class LeaderLatch implements Closeable
      */
     public LeaderLatch(CuratorFramework client, String latchPath, String id)
     {
+        this(client, latchPath, id, CloseMode.SILENT);
+    }
+
+    /**
+     * @param client    the client
+     * @param latchPath the path for this leadership group
+     * @param id        participant ID
+     * @param closeMode behaviour of listener on explicit close.
+     */
+    public LeaderLatch(CuratorFramework client, String latchPath, String id, 
CloseMode closeMode)
+    {
         this.client = Preconditions.checkNotNull(client, "client cannot be 
null");
         this.latchPath = Preconditions.checkNotNull(latchPath, "mutexPath 
cannot be null");
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
+        this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode 
cannot be null");
     }
 
     /**
@@ -139,7 +158,22 @@ public class LeaderLatch implements Closeable
     @Override
     public void close() throws IOException
     {
+        close(this.closeMode);
+    }
+
+    /**
+     * Remove this instance from the leadership election. If this instance is 
the leader, leadership
+     * is released. IMPORTANT: the only way to release leadership is by 
calling close(). All LeaderLatch
+     * instances must eventually be closed.
+     *
+     * @param closeMode allows the default close mode to be overridden at the 
time the latch is closed.
+     *
+     * @throws IOException errors
+     */
+    public void close(CloseMode closeMode) throws IOException
+    {
         Preconditions.checkState(state.compareAndSet(State.STARTED, 
State.CLOSED), "Already closed or has not been started");
+        Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
 
         try
         {
@@ -152,8 +186,18 @@ public class LeaderLatch implements Closeable
         finally
         {
             client.getConnectionStateListenable().removeListener(listener);
-            listeners.clear();
-            setLeadership(false);
+
+            switch(closeMode)
+            {
+                case NOTIFY_LEADER:
+                    setLeadership(false);
+                    listeners.clear();
+                    break;
+                default:
+                    listeners.clear();
+                    setLeadership(false);
+                    break;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/2580ef3f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index f4b5590..067c817 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -365,6 +365,177 @@ public class TestLeaderLatch extends BaseClassForTests
         }
     }
 
+    @Test
+    public void testCallbackNotifyLeader() throws Exception
+    {
+        final int PARTICIPANT_QTY = 10;
+        final int SILENT_QTY = 3;
+
+        final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
+        final AtomicLong masterCounter = new AtomicLong(0);
+        final AtomicLong dunceCounter = new AtomicLong(0);
+
+        Timing timing = new Timing();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        ExecutorService exec = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackNotifyLeader-%s").build());
+
+        List<LeaderLatch> latches = Lists.newArrayList();
+        for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+        {
+            LeaderLatch.CloseMode closeMode = i < SILENT_QTY ?
+                    LeaderLatch.CloseMode.SILENT :
+                    LeaderLatch.CloseMode.NOTIFY_LEADER;
+
+            final LeaderLatch latch = new LeaderLatch(client, PATH_NAME, "", 
closeMode);
+            latch.addListener(
+                new LeaderLatchListener()
+                {
+                    boolean beenLeader = false;
+
+                    @Override
+                    public void isLeader()
+                    {
+                        if ( !beenLeader )
+                        {
+                            masterCounter.incrementAndGet();
+                            beenLeader = true;
+                            try
+                            {
+                                latch.reset();
+                            }
+                            catch ( Exception e )
+                            {
+                                throw Throwables.propagate(e);
+                            }
+                        }
+                        else
+                        {
+                            masterCounter.incrementAndGet();
+                            CloseableUtils.closeQuietly(latch);
+                            timesSquare.countDown();
+                        }
+                    }
+
+                    @Override
+                    public void notLeader()
+                    {
+                        dunceCounter.incrementAndGet();
+                    }
+                },
+                exec
+            );
+            latches.add(latch);
+        }
+
+        try
+        {
+            client.start();
+
+            for ( LeaderLatch latch : latches )
+            {
+                latch.start();
+            }
+
+            timesSquare.await();
+
+            Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
+            Assert.assertEquals(dunceCounter.get(), PARTICIPANT_QTY * 2 - 
SILENT_QTY);
+            for ( LeaderLatch latch : latches )
+            {
+                Assert.assertEquals(latch.getState(), 
LeaderLatch.State.CLOSED);
+            }
+        }
+        finally
+        {
+            for ( LeaderLatch latch : latches )
+            {
+                if ( latch.getState() != LeaderLatch.State.CLOSED )
+                {
+                    CloseableUtils.closeQuietly(latch);
+                }
+            }
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testCallbackDontNotifyDunce() throws Exception {
+        final AtomicLong masterCounter = new AtomicLong(0);
+        final AtomicLong dunceCounter = new AtomicLong(0);
+
+        Timing timing = new Timing();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+
+        final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
+        final LeaderLatch dunce = new LeaderLatch(client, PATH_NAME, "", 
LeaderLatch.CloseMode.NOTIFY_LEADER);
+
+        leader.addListener(new LeaderLatchListener()
+        {
+            @Override
+            public void isLeader()
+            {
+            }
+
+            @Override
+            public void notLeader()
+            {
+                masterCounter.incrementAndGet();
+            }
+        });
+
+        dunce.addListener(new LeaderLatchListener()
+        {
+            @Override
+            public void isLeader()
+            {
+            }
+
+            @Override
+            public void notLeader()
+            {
+                dunceCounter.incrementAndGet();
+            }
+        });
+
+        try
+        {
+            client.start();
+
+            leader.start();
+
+            timing.sleepABit();
+
+            dunce.start();
+
+            timing.sleepABit();
+
+            dunce.close();
+
+            timing.sleepABit();
+
+            // Test the close override
+            leader.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
+
+            Assert.assertEquals(leader.getState(), LeaderLatch.State.CLOSED);
+            Assert.assertEquals(dunce.getState(), LeaderLatch.State.CLOSED);
+
+            Assert.assertEquals(masterCounter.get(), 1);
+            Assert.assertEquals(dunceCounter.get(), 0);
+        }
+        finally
+        {
+            if (leader.getState() != LeaderLatch.State.CLOSED)
+            {
+                CloseableUtils.closeQuietly(leader);
+            }
+            if (dunce.getState() != LeaderLatch.State.CLOSED)
+            {
+                CloseableUtils.closeQuietly(dunce);
+            }
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
     private enum Mode
     {
         START_IMMEDIATELY,

Reply via email to