Adding a new constructor to Reaper so that it can optionally take a fully 
constructed leader latch that is owned by another class rather than create its 
own leader latch


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/520ae54a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/520ae54a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/520ae54a

Branch: refs/heads/CURATOR-160
Commit: 520ae54ac4a49292201417fa6b1104cf579704d3
Parents: febfcec
Author: David Kesler <dkes...@yodle.com>
Authored: Mon Feb 9 13:19:20 2015 -0500
Committer: David Kesler <dkes...@yodle.com>
Committed: Mon Feb 9 13:19:20 2015 -0500

----------------------------------------------------------------------
 .../curator/framework/recipes/locks/Reaper.java | 62 +++++++++++---
 .../framework/recipes/locks/TestReaper.java     | 90 +++++++++++++++++++-
 2 files changed, 137 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/520ae54a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index 8802372..660e3d3 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -52,6 +52,7 @@ public class Reaper implements Closeable
     private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
     private final LeaderLatch leaderLatch;
     private final AtomicBoolean reapingIsActive = new AtomicBoolean(true);
+    private final boolean ownsLeaderLatch;
 
     private enum State
     {
@@ -111,7 +112,7 @@ public class Reaper implements Closeable
      */
     public Reaper(CuratorFramework client)
     {
-        this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, null);
+        this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, 
(String) null);
     }
 
     /**
@@ -122,7 +123,7 @@ public class Reaper implements Closeable
      */
     public Reaper(CuratorFramework client, int reapingThresholdMs)
     {
-        this(client, newExecutorService(), reapingThresholdMs, null);
+        this(client, newExecutorService(), reapingThresholdMs, (String) null);
     }
 
     /**
@@ -132,7 +133,7 @@ public class Reaper implements Closeable
      */
     public Reaper(CuratorFramework client, ScheduledExecutorService executor, 
int reapingThresholdMs)
     {
-        this(client, executor, reapingThresholdMs, null);
+        this(client, executor, reapingThresholdMs, (String) null);
     }
 
     /**
@@ -143,18 +144,41 @@ public class Reaper implements Closeable
      */
     public Reaper(CuratorFramework client, ScheduledExecutorService executor, 
int reapingThresholdMs, String leaderPath)
     {
+        this(client, executor, reapingThresholdMs, 
makeLeaderLatchIfPathNotNull(client, leaderPath), true);
+    }
+
+    /**
+     * @param client             client
+     * @param executor           thread pool
+     * @param reapingThresholdMs threshold in milliseconds that determines 
that a path can be deleted
+     * @param leaderLatch        a pre-created leader latch to ensure only 1 
reaper is active in the cluster
+     */
+    public Reaper(CuratorFramework client, ScheduledExecutorService executor, 
int reapingThresholdMs, LeaderLatch leaderLatch)
+    {
+        this(client, executor, reapingThresholdMs, leaderLatch, false);
+    }
+
+    /**
+     * @param client             client
+     * @param executor           thread pool
+     * @param reapingThresholdMs threshold in milliseconds that determines 
that a path can be deleted
+     * @param leaderLatch        a pre-created leader latch to ensure only 1 
reaper is active in the cluster
+     * @param ownsLeaderLatch    indicates whether or not the reaper owns the 
leader latch (if it exists) and thus should start/stop it
+     * */
+    private Reaper(CuratorFramework client, ScheduledExecutorService executor, 
int reapingThresholdMs, LeaderLatch leaderLatch, boolean ownsLeaderLatch)
+    {
         this.client = client;
         this.executor = new CloseableScheduledExecutorService(executor);
         this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD;
-
-        LeaderLatch localLeaderLatch = null;
-        if ( leaderPath != null )
+        this.leaderLatch = leaderLatch;
+        if (leaderLatch != null)
         {
-            localLeaderLatch = makeLeaderLatch(client, leaderPath);
+            addListenerToLeaderLatch(leaderLatch);
         }
-        leaderLatch = localLeaderLatch;
+        this.ownsLeaderLatch = ownsLeaderLatch;
     }
 
+
     /**
      * Add a path (using Mode.REAP_INDEFINITELY) to be checked by the reaper. 
The path will be checked periodically
      * until the reaper is closed.
@@ -200,7 +224,7 @@ public class Reaper implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Cannot be started more than once");
 
-        if ( leaderLatch != null )
+        if ( leaderLatch != null && ownsLeaderLatch)
         {
             leaderLatch.start();
         }
@@ -212,7 +236,7 @@ public class Reaper implements Closeable
         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
         {
             executor.close();
-            if ( leaderLatch != null )
+            if ( leaderLatch != null && ownsLeaderLatch )
             {
                 leaderLatch.close();
             }
@@ -310,11 +334,10 @@ public class Reaper implements Closeable
         return ThreadUtils.newSingleThreadScheduledExecutor("Reaper");
     }
 
-    private LeaderLatch makeLeaderLatch(CuratorFramework client, String 
leaderPath)
+    private void addListenerToLeaderLatch(LeaderLatch leaderLatch)
     {
         reapingIsActive.set(false);
 
-        LeaderLatch localLeaderLatch = new LeaderLatch(client, leaderPath);
         LeaderLatchListener listener = new LeaderLatchListener()
         {
             @Override
@@ -333,7 +356,18 @@ public class Reaper implements Closeable
                 reapingIsActive.set(false);
             }
         };
-        localLeaderLatch.addListener(listener);
-        return localLeaderLatch;
+        leaderLatch.addListener(listener);
+    }
+
+    private static LeaderLatch makeLeaderLatchIfPathNotNull(CuratorFramework 
client, String leaderPath)
+    {
+        if (leaderPath == null)
+        {
+            return null;
+        }
+        else
+        {
+            return new LeaderLatch(client, leaderPath);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/520ae54a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
index 83ec960..c47808f 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderSelector;
 import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
 import org.apache.curator.framework.state.ConnectionState;
@@ -48,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class TestReaper extends BaseClassForTests
 {
     @Test
-    public void testUsingLeader() throws Exception
+    public void testUsingLeaderPath() throws Exception
     {
         final Timing timing = new Timing();
         CuratorFramework client = makeClient(timing, null);
@@ -118,6 +119,93 @@ public class TestReaper extends BaseClassForTests
     }
 
     @Test
+    public void testUsingLeaderLatch() throws Exception
+    {
+        final Timing timing = new Timing();
+        CuratorFramework client = makeClient(timing, null);
+        Reaper reaper1 = null;
+        Reaper reaper2 = null;
+        LeaderLatch leaderLatch1 = null;
+        LeaderLatch leaderLatch2 = null;
+        try
+        {
+            final AtomicInteger reaper1Count = new AtomicInteger();
+            leaderLatch1 = new LeaderLatch(client, "/reaper/leader");
+            reaper1 = new Reaper(client, Reaper.newExecutorService(), 1, 
leaderLatch1)
+            {
+                @Override
+                protected void reap(PathHolder holder)
+                {
+                    reaper1Count.incrementAndGet();
+                    super.reap(holder);
+                }
+            };
+
+            final AtomicInteger reaper2Count = new AtomicInteger();
+            leaderLatch2 = new LeaderLatch(client, "/reaper/leader");
+            reaper2 = new Reaper(client, Reaper.newExecutorService(), 1, 
leaderLatch2)
+            {
+                @Override
+                protected void reap(PathHolder holder)
+                {
+                    reaper2Count.incrementAndGet();
+                    super.reap(holder);
+                }
+            };
+
+            client.start();
+            
client.create().creatingParentsIfNeeded().forPath("/one/two/three");
+
+            leaderLatch1.start();
+            leaderLatch2.start();
+
+            reaper1.start();
+            reaper2.start();
+
+            reaper1.addPath("/one/two/three");
+            reaper2.addPath("/one/two/three");
+
+            timing.sleepABit();
+
+            Assert.assertTrue((reaper1Count.get() == 0) || (reaper2Count.get() 
== 0));
+            Assert.assertTrue((reaper1Count.get() > 0) || (reaper2Count.get() 
> 0));
+
+            Reaper activeReaper;
+            LeaderLatch activeLeaderLeatch;
+            AtomicInteger inActiveReaperCount;
+            if ( reaper1Count.get() > 0 )
+            {
+                activeReaper = reaper1;
+                activeLeaderLeatch = leaderLatch1;
+                inActiveReaperCount = reaper2Count;
+            }
+            else
+            {
+                activeReaper = reaper2;
+                activeLeaderLeatch = leaderLatch2;
+                inActiveReaperCount = reaper1Count;
+            }
+            Assert.assertEquals(inActiveReaperCount.get(), 0);
+            activeReaper.close();
+            activeLeaderLeatch.close();
+            timing.sleepABit();
+            Assert.assertTrue(inActiveReaperCount.get() > 0);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(reaper1);
+            CloseableUtils.closeQuietly(reaper2);
+            if (leaderLatch1 != null && LeaderLatch.State.STARTED == 
leaderLatch1.getState()) {
+                CloseableUtils.closeQuietly(leaderLatch1);
+            }
+            if (leaderLatch2 != null && LeaderLatch.State.STARTED == 
leaderLatch2.getState()) {
+                CloseableUtils.closeQuietly(leaderLatch2);
+            }
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testUsingManualLeader() throws Exception
     {
         final Timing timing = new Timing();

Reply via email to