ChildReaper now creates a leaderLatch itself if a leader path is provided and does no work (such as passing paths to its Reaper) if it is not currently the leader.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/49eb02a0 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/49eb02a0 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/49eb02a0 Branch: refs/heads/CURATOR-154 Commit: 49eb02a04f377a3b9e2da3b3904311ddddf1aa9d Parents: 9da7960 Author: David Kesler <dkes...@yodle.com> Authored: Mon Feb 9 13:50:39 2015 -0500 Committer: David Kesler <dkes...@yodle.com> Committed: Mon Feb 9 13:50:39 2015 -0500 ---------------------------------------------------------------------- .../framework/recipes/locks/ChildReaper.java | 56 ++++++++++++++------ .../recipes/locks/TestChildReaper.java | 47 ++++++++++++++++ 2 files changed, 88 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/49eb02a0/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java index 6e0a7e4..56c56ab 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java @@ -20,6 +20,8 @@ package org.apache.curator.framework.recipes.locks; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; + +import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.CloseableScheduledExecutorService; @@ -52,6 +54,7 @@ public class ChildReaper implements Closeable private final Reaper.Mode mode; private final CloseableScheduledExecutorService executor; private final int reapingThresholdMs; + private final LeaderLatch leaderLatch; private volatile Future<?> task; @@ -109,7 +112,15 @@ public class ChildReaper implements Closeable this.mode = mode; this.executor = new CloseableScheduledExecutorService(executor); this.reapingThresholdMs = reapingThresholdMs; - this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderPath); + if (leaderPath != null) + { + leaderLatch = new LeaderLatch(client, leaderPath); + } + else + { + leaderLatch = null; + } + this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch); addPath(path); } @@ -136,7 +147,10 @@ public class ChildReaper implements Closeable reapingThresholdMs, TimeUnit.MILLISECONDS ); - + if (leaderLatch != null) + { + leaderLatch.start(); + } reaper.start(); } @@ -146,6 +160,10 @@ public class ChildReaper implements Closeable if ( state.compareAndSet(State.STARTED, State.CLOSED) ) { CloseableUtils.closeQuietly(reaper); + if (leaderLatch != null) + { + CloseableUtils.closeQuietly(leaderLatch); + } task.cancel(true); } } @@ -173,32 +191,40 @@ public class ChildReaper implements Closeable return paths.remove(PathUtils.validatePath(path)); } - private static ScheduledExecutorService newExecutorService() + public static ScheduledExecutorService newExecutorService() { return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper"); } private void doWork() { - for ( String path : paths ) + if (shouldDoWork()) { - try + for ( String path : paths ) { - List<String> children = client.getChildren().forPath(path); - for ( String name : children ) + try { - String thisPath = ZKPaths.makePath(path, name); - Stat stat = client.checkExists().forPath(thisPath); - if ( (stat != null) && (stat.getNumChildren() == 0) ) + List<String> children = client.getChildren().forPath(path); + for ( String name : children ) { - reaper.addPath(thisPath, mode); + String thisPath = ZKPaths.makePath(path, name); + Stat stat = client.checkExists().forPath(thisPath); + if ( (stat != null) && (stat.getNumChildren() == 0) ) + { + reaper.addPath(thisPath, mode); + } } } - } - catch ( Exception e ) - { - log.error("Could not get children for path: " + path, e); + catch ( Exception e ) + { + log.error("Could not get children for path: " + path, e); + } } } } + + private boolean shouldDoWork() + { + return this.leaderLatch == null || this.leaderLatch.hasLeadership(); + } } http://git-wip-us.apache.org/repos/asf/curator/blob/49eb02a0/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java index ad6ba6c..d81bb3a 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java @@ -18,6 +18,7 @@ */ package org.apache.curator.framework.recipes.locks; +import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; @@ -100,6 +101,52 @@ public class TestChildReaper extends BaseClassForTests } @Test + public void testLeaderElection() throws Exception + { + Timing timing = new Timing(); + ChildReaper reaper = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + LeaderLatch otherLeader = null; + try + { + client.start(); + + for ( int i = 0; i < 10; ++i ) + { + client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i)); + } + + otherLeader = new LeaderLatch(client, "/test-leader"); + otherLeader.start(); + + reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, ChildReaper.newExecutorService(), 1, "/test-leader"); + reaper.start(); + + timing.forWaiting().sleepABit(); + + //Should not have reaped anything at this point since otherLeader is still leader + Stat stat = client.checkExists().forPath("/test"); + Assert.assertEquals(stat.getNumChildren(), 10); + + CloseableUtils.closeQuietly(otherLeader); + + timing.forWaiting().sleepABit(); + + stat = client.checkExists().forPath("/test"); + Assert.assertEquals(stat.getNumChildren(), 0); + } + finally + { + CloseableUtils.closeQuietly(reaper); + if (otherLeader != null && otherLeader.getState() == LeaderLatch.State.STARTED) + { + CloseableUtils.closeQuietly(otherLeader); + } + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testMultiPath() throws Exception { Timing timing = new Timing();