make sure ChildReaper always moves through all registered nodes. Also, add an optionl check so that large nodes are never queried
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7fe94bb1 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7fe94bb1 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7fe94bb1 Branch: refs/heads/CURATOR-160 Commit: 7fe94bb1517fa32dcb2e1972ada9b7b493c1108b Parents: 6a56c51 Author: randgalt <randg...@apache.org> Authored: Mon Apr 6 16:41:24 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Mon Apr 6 16:41:24 2015 -0500 ---------------------------------------------------------------------- .../framework/recipes/locks/ChildReaper.java | 52 ++++- .../recipes/locks/TestChildReaper.java | 195 +++++++++++++++---- 2 files changed, 207 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/7fe94bb1/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java index 6a2c05a..ee5c414 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,13 +19,14 @@ */ package org.apache.curator.framework.recipes.locks; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; - -import org.apache.curator.framework.recipes.leader.LeaderLatch; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.utils.CloseableScheduledExecutorService; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.PathUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.data.Stat; @@ -34,13 +36,14 @@ import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.apache.curator.utils.PathUtils; /** * Utility to reap empty child nodes of a parent node. Periodically calls getChildren on @@ -53,11 +56,13 @@ public class ChildReaper implements Closeable private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); private final CuratorFramework client; private final Collection<String> paths = Sets.newConcurrentHashSet(); + private volatile Iterator<String> pathIterator = null; private final Reaper.Mode mode; private final CloseableScheduledExecutorService executor; private final int reapingThresholdMs; private final LeaderLatch leaderLatch; private final Set<String> lockSchema; + private final AtomicInteger maxChildren = new AtomicInteger(-1); private volatile Future<?> task; @@ -210,19 +215,54 @@ public class ChildReaper implements Closeable return paths.remove(PathUtils.validatePath(path)); } + /** + * If a node has so many children that {@link CuratorFramework#getChildren()} will fail + * (due to jute.maxbuffer) it can cause connection instability. Set the max number of + * children here to prevent the path from being queried in these cases. The number should usually + * be: avergage-node-name/1000000 + * + * @param maxChildren max children + */ + public void setMaxChildren(int maxChildren) + { + this.maxChildren.set(maxChildren); + } + public static ScheduledExecutorService newExecutorService() { return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper"); } + @VisibleForTesting + protected void warnMaxChildren(String path, Stat stat) + { + log.warn(String.format("Skipping %s as it has too many children: %d", path, stat.getNumChildren())); + } + private void doWork() { - if (shouldDoWork()) + if ( shouldDoWork() ) { - for ( String path : paths ) + if ( (pathIterator == null) || !pathIterator.hasNext() ) + { + pathIterator = paths.iterator(); + } + while ( pathIterator.hasNext() ) { + String path = pathIterator.next(); try { + int maxChildren = this.maxChildren.get(); + if ( maxChildren > 0 ) + { + Stat stat = client.checkExists().forPath(path); + if ( (stat != null) && (stat.getNumChildren() > maxChildren) ) + { + warnMaxChildren(path, stat); + continue; + } + } + List<String> children = client.getChildren().forPath(path); for ( String name : children ) { http://git-wip-us.apache.org/repos/asf/curator/blob/7fe94bb1/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java index d81bb3a..906d9d4 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -16,35 +16,161 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.recipes.locks; -import org.apache.curator.framework.recipes.leader.LeaderLatch; -import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.testng.Assert; import org.testng.annotations.Test; import java.util.Random; +import java.util.concurrent.CountDownLatch; public class TestChildReaper extends BaseClassForTests { @Test - public void testSomeNodes() throws Exception + public void testMaxChildren() throws Exception + { + server.close(); + + final int LARGE_QTY = 10000; + + System.setProperty("jute.maxbuffer", "" + LARGE_QTY); + server = new TestingServer(); + try + { + Timing timing = new Timing(); + ChildReaper reaper = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3)); + try + { + client.start(); + + for ( int i = 0; i < LARGE_QTY; ++i ) + { + if ( (i % 1000) == 0 ) + { + System.out.println(i); + } + client.create().creatingParentsIfNeeded().forPath("/big/node-" + i); + } + + try + { + client.getChildren().forPath("/big"); + Assert.fail("Should have been a connection loss"); + } + catch ( KeeperException.ConnectionLossException e ) + { + // expected + } + + final CountDownLatch latch = new CountDownLatch(1); + reaper = new ChildReaper(client, "/big", Reaper.Mode.REAP_UNTIL_DELETE, 1) + { + @Override + protected void warnMaxChildren(String path, Stat stat) + { + latch.countDown(); + super.warnMaxChildren(path, stat); + } + }; + reaper.setMaxChildren(100); + reaper.start(); + Assert.assertTrue(timing.awaitLatch(latch)); + } + finally + { + CloseableUtils.closeQuietly(reaper); + CloseableUtils.closeQuietly(client); + } + } + finally + { + System.clearProperty("jute.maxbuffer"); + } + } + + @Test + public void testLargeNodes() throws Exception { + server.close(); + + final int LARGE_QTY = 10000; + final int SMALL_QTY = 100; + + System.setProperty("jute.maxbuffer", "" + LARGE_QTY); + server = new TestingServer(); + try + { + Timing timing = new Timing(); + ChildReaper reaper = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3)); + try + { + client.start(); + + for ( int i = 0; i < LARGE_QTY; ++i ) + { + if ( (i % 1000) == 0 ) + { + System.out.println(i); + } + client.create().creatingParentsIfNeeded().forPath("/big/node-" + i); + + if ( i < SMALL_QTY ) + { + client.create().creatingParentsIfNeeded().forPath("/small/node-" + i); + } + } + + reaper = new ChildReaper(client, "/foo", Reaper.Mode.REAP_UNTIL_DELETE, 1); + reaper.start(); - Timing timing = new Timing(); - ChildReaper reaper = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + reaper.addPath("/big"); + reaper.addPath("/small"); + + int count = -1; + for ( int i = 0; (i < 10) && (count != 0); ++i ) + { + timing.sleepABit(); + count = client.checkExists().forPath("/small").getNumChildren(); + } + Assert.assertEquals(count, 0); + } + finally + { + CloseableUtils.closeQuietly(reaper); + CloseableUtils.closeQuietly(client); + } + } + finally + { + System.clearProperty("jute.maxbuffer"); + } + } + + @Test + public void testSomeNodes() throws Exception + { + Timing timing = new Timing(); + ChildReaper reaper = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); try { client.start(); - Random r = new Random(); - int nonEmptyNodes = 0; + Random r = new Random(); + int nonEmptyNodes = 0; for ( int i = 0; i < 10; ++i ) { client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i)); @@ -60,7 +186,7 @@ public class TestChildReaper extends BaseClassForTests timing.forWaiting().sleepABit(); - Stat stat = client.checkExists().forPath("/test"); + Stat stat = client.checkExists().forPath("/test"); Assert.assertEquals(stat.getNumChildren(), nonEmptyNodes); } finally @@ -71,11 +197,11 @@ public class TestChildReaper extends BaseClassForTests } @Test - public void testSimple() throws Exception + public void testSimple() throws Exception { - Timing timing = new Timing(); - ChildReaper reaper = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + Timing timing = new Timing(); + ChildReaper reaper = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); try { client.start(); @@ -90,7 +216,7 @@ public class TestChildReaper extends BaseClassForTests timing.forWaiting().sleepABit(); - Stat stat = client.checkExists().forPath("/test"); + Stat stat = client.checkExists().forPath("/test"); Assert.assertEquals(stat.getNumChildren(), 0); } finally @@ -101,11 +227,11 @@ public class TestChildReaper extends BaseClassForTests } @Test - public void testLeaderElection() throws Exception + public void testLeaderElection() throws Exception { - Timing timing = new Timing(); - ChildReaper reaper = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + Timing timing = new Timing(); + ChildReaper reaper = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); LeaderLatch otherLeader = null; try { @@ -118,6 +244,7 @@ public class TestChildReaper extends BaseClassForTests otherLeader = new LeaderLatch(client, "/test-leader"); otherLeader.start(); + otherLeader.await(); reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, ChildReaper.newExecutorService(), 1, "/test-leader"); reaper.start(); @@ -125,7 +252,7 @@ public class TestChildReaper extends BaseClassForTests timing.forWaiting().sleepABit(); //Should not have reaped anything at this point since otherLeader is still leader - Stat stat = client.checkExists().forPath("/test"); + Stat stat = client.checkExists().forPath("/test"); Assert.assertEquals(stat.getNumChildren(), 10); CloseableUtils.closeQuietly(otherLeader); @@ -138,7 +265,7 @@ public class TestChildReaper extends BaseClassForTests finally { CloseableUtils.closeQuietly(reaper); - if (otherLeader != null && otherLeader.getState() == LeaderLatch.State.STARTED) + if ( otherLeader != null && otherLeader.getState() == LeaderLatch.State.STARTED ) { CloseableUtils.closeQuietly(otherLeader); } @@ -147,11 +274,11 @@ public class TestChildReaper extends BaseClassForTests } @Test - public void testMultiPath() throws Exception + public void testMultiPath() throws Exception { - Timing timing = new Timing(); - ChildReaper reaper = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + Timing timing = new Timing(); + ChildReaper reaper = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); try { client.start(); @@ -169,7 +296,7 @@ public class TestChildReaper extends BaseClassForTests timing.forWaiting().sleepABit(); - Stat stat = client.checkExists().forPath("/test1"); + Stat stat = client.checkExists().forPath("/test1"); Assert.assertEquals(stat.getNumChildren(), 0); stat = client.checkExists().forPath("/test2"); Assert.assertEquals(stat.getNumChildren(), 0); @@ -184,11 +311,11 @@ public class TestChildReaper extends BaseClassForTests } @Test - public void testNamespace() throws Exception + public void testNamespace() throws Exception { - Timing timing = new Timing(); - ChildReaper reaper = null; - CuratorFramework client = CuratorFrameworkFactory.builder() + Timing timing = new Timing(); + ChildReaper reaper = null; + CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .sessionTimeoutMs(timing.session()) .connectionTimeoutMs(timing.connection()) @@ -209,7 +336,7 @@ public class TestChildReaper extends BaseClassForTests timing.forWaiting().sleepABit(); - Stat stat = client.checkExists().forPath("/test"); + Stat stat = client.checkExists().forPath("/test"); Assert.assertEquals(stat.getNumChildren(), 0); stat = client.usingNamespace(null).checkExists().forPath("/foo/test");