make sure ChildReaper always moves through all registered nodes. Also, add an 
optionl check so that large nodes are never queried


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7fe94bb1
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7fe94bb1
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7fe94bb1

Branch: refs/heads/CURATOR-160
Commit: 7fe94bb1517fa32dcb2e1972ada9b7b493c1108b
Parents: 6a56c51
Author: randgalt <randg...@apache.org>
Authored: Mon Apr 6 16:41:24 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Mon Apr 6 16:41:24 2015 -0500

----------------------------------------------------------------------
 .../framework/recipes/locks/ChildReaper.java    |  52 ++++-
 .../recipes/locks/TestChildReaper.java          | 195 +++++++++++++++----
 2 files changed, 207 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/7fe94bb1/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index 6a2c05a..ee5c414 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,13 +19,14 @@
  */
 package org.apache.curator.framework.recipes.locks;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
-
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.utils.CloseableScheduledExecutorService;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.data.Stat;
@@ -34,13 +36,14 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
 
 /**
  * Utility to reap empty child nodes of a parent node. Periodically calls 
getChildren on
@@ -53,11 +56,13 @@ public class ChildReaper implements Closeable
     private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
     private final CuratorFramework client;
     private final Collection<String> paths = Sets.newConcurrentHashSet();
+    private volatile Iterator<String> pathIterator = null;
     private final Reaper.Mode mode;
     private final CloseableScheduledExecutorService executor;
     private final int reapingThresholdMs;
     private final LeaderLatch leaderLatch;
     private final Set<String> lockSchema;
+    private final AtomicInteger maxChildren = new AtomicInteger(-1);
 
     private volatile Future<?> task;
 
@@ -210,19 +215,54 @@ public class ChildReaper implements Closeable
         return paths.remove(PathUtils.validatePath(path));
     }
 
+    /**
+     * If a node has so many children that {@link 
CuratorFramework#getChildren()} will fail
+     * (due to jute.maxbuffer) it can cause connection instability. Set the 
max number of
+     * children here to prevent the path from being queried in these cases. 
The number should usually
+     * be: avergage-node-name/1000000
+     *
+     * @param maxChildren max children
+     */
+    public void setMaxChildren(int maxChildren)
+    {
+        this.maxChildren.set(maxChildren);
+    }
+
     public static ScheduledExecutorService newExecutorService()
     {
         return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper");
     }
 
+    @VisibleForTesting
+    protected void warnMaxChildren(String path, Stat stat)
+    {
+        log.warn(String.format("Skipping %s as it has too many children: %d", 
path, stat.getNumChildren()));
+    }
+
     private void doWork()
     {
-        if (shouldDoWork())
+        if ( shouldDoWork() )
         {
-            for ( String path : paths )
+            if ( (pathIterator == null) || !pathIterator.hasNext() )
+            {
+                pathIterator = paths.iterator();
+            }
+            while ( pathIterator.hasNext() )
             {
+                String path = pathIterator.next();
                 try
                 {
+                    int maxChildren = this.maxChildren.get();
+                    if ( maxChildren > 0 )
+                    {
+                        Stat stat = client.checkExists().forPath(path);
+                        if ( (stat != null) && (stat.getNumChildren() > 
maxChildren) )
+                        {
+                            warnMaxChildren(path, stat);
+                            continue;
+                        }
+                    }
+
                     List<String> children = client.getChildren().forPath(path);
                     for ( String name : children )
                     {

http://git-wip-us.apache.org/repos/asf/curator/blob/7fe94bb1/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
index d81bb3a..906d9d4 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,35 +16,161 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.locks;
 
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 
 public class TestChildReaper extends BaseClassForTests
 {
     @Test
-    public void     testSomeNodes() throws Exception
+    public void testMaxChildren() throws Exception
+    {
+        server.close();
+
+        final int LARGE_QTY = 10000;
+
+        System.setProperty("jute.maxbuffer", "" + LARGE_QTY);
+        server = new TestingServer();
+        try
+        {
+            Timing timing = new Timing();
+            ChildReaper reaper = null;
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new ExponentialBackoffRetry(100, 3));
+            try
+            {
+                client.start();
+
+                for ( int i = 0; i < LARGE_QTY; ++i )
+                {
+                    if ( (i % 1000) == 0 )
+                    {
+                        System.out.println(i);
+                    }
+                    
client.create().creatingParentsIfNeeded().forPath("/big/node-" + i);
+                }
+
+                try
+                {
+                    client.getChildren().forPath("/big");
+                    Assert.fail("Should have been a connection loss");
+                }
+                catch ( KeeperException.ConnectionLossException e )
+                {
+                    // expected
+                }
+
+                final CountDownLatch latch = new CountDownLatch(1);
+                reaper = new ChildReaper(client, "/big", 
Reaper.Mode.REAP_UNTIL_DELETE, 1)
+                {
+                    @Override
+                    protected void warnMaxChildren(String path, Stat stat)
+                    {
+                        latch.countDown();
+                        super.warnMaxChildren(path, stat);
+                    }
+                };
+                reaper.setMaxChildren(100);
+                reaper.start();
+                Assert.assertTrue(timing.awaitLatch(latch));
+            }
+            finally
+            {
+                CloseableUtils.closeQuietly(reaper);
+                CloseableUtils.closeQuietly(client);
+            }
+        }
+        finally
+        {
+            System.clearProperty("jute.maxbuffer");
+        }
+    }
+
+    @Test
+    public void testLargeNodes() throws Exception
     {
+        server.close();
+
+        final int LARGE_QTY = 10000;
+        final int SMALL_QTY = 100;
+
+        System.setProperty("jute.maxbuffer", "" + LARGE_QTY);
+        server = new TestingServer();
+        try
+        {
+            Timing timing = new Timing();
+            ChildReaper reaper = null;
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new ExponentialBackoffRetry(100, 3));
+            try
+            {
+                client.start();
+
+                for ( int i = 0; i < LARGE_QTY; ++i )
+                {
+                    if ( (i % 1000) == 0 )
+                    {
+                        System.out.println(i);
+                    }
+                    
client.create().creatingParentsIfNeeded().forPath("/big/node-" + i);
+
+                    if ( i < SMALL_QTY )
+                    {
+                        
client.create().creatingParentsIfNeeded().forPath("/small/node-" + i);
+                    }
+                }
+
+                reaper = new ChildReaper(client, "/foo", 
Reaper.Mode.REAP_UNTIL_DELETE, 1);
+                reaper.start();
 
-        Timing                  timing = new Timing();
-        ChildReaper             reaper = null;
-        CuratorFramework        client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+                reaper.addPath("/big");
+                reaper.addPath("/small");
+
+                int count = -1;
+                for ( int i = 0; (i < 10) && (count != 0); ++i )
+                {
+                    timing.sleepABit();
+                    count = 
client.checkExists().forPath("/small").getNumChildren();
+                }
+                Assert.assertEquals(count, 0);
+            }
+            finally
+            {
+                CloseableUtils.closeQuietly(reaper);
+                CloseableUtils.closeQuietly(client);
+            }
+        }
+        finally
+        {
+            System.clearProperty("jute.maxbuffer");
+        }
+    }
+
+    @Test
+    public void testSomeNodes() throws Exception
+    {
+        Timing timing = new Timing();
+        ChildReaper reaper = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
         try
         {
             client.start();
 
-            Random              r = new Random();
-            int                 nonEmptyNodes = 0;
+            Random r = new Random();
+            int nonEmptyNodes = 0;
             for ( int i = 0; i < 10; ++i )
             {
                 client.create().creatingParentsIfNeeded().forPath("/test/" + 
Integer.toString(i));
@@ -60,7 +186,7 @@ public class TestChildReaper extends BaseClassForTests
 
             timing.forWaiting().sleepABit();
 
-            Stat    stat = client.checkExists().forPath("/test");
+            Stat stat = client.checkExists().forPath("/test");
             Assert.assertEquals(stat.getNumChildren(), nonEmptyNodes);
         }
         finally
@@ -71,11 +197,11 @@ public class TestChildReaper extends BaseClassForTests
     }
 
     @Test
-    public void     testSimple() throws Exception
+    public void testSimple() throws Exception
     {
-        Timing                  timing = new Timing();
-        ChildReaper             reaper = null;
-        CuratorFramework        client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        Timing timing = new Timing();
+        ChildReaper reaper = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
         try
         {
             client.start();
@@ -90,7 +216,7 @@ public class TestChildReaper extends BaseClassForTests
 
             timing.forWaiting().sleepABit();
 
-            Stat    stat = client.checkExists().forPath("/test");
+            Stat stat = client.checkExists().forPath("/test");
             Assert.assertEquals(stat.getNumChildren(), 0);
         }
         finally
@@ -101,11 +227,11 @@ public class TestChildReaper extends BaseClassForTests
     }
 
     @Test
-    public void     testLeaderElection() throws Exception
+    public void testLeaderElection() throws Exception
     {
-        Timing                  timing = new Timing();
-        ChildReaper             reaper = null;
-        CuratorFramework        client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        Timing timing = new Timing();
+        ChildReaper reaper = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
         LeaderLatch otherLeader = null;
         try
         {
@@ -118,6 +244,7 @@ public class TestChildReaper extends BaseClassForTests
 
             otherLeader = new LeaderLatch(client, "/test-leader");
             otherLeader.start();
+            otherLeader.await();
 
             reaper = new ChildReaper(client, "/test", 
Reaper.Mode.REAP_UNTIL_DELETE, ChildReaper.newExecutorService(), 1, 
"/test-leader");
             reaper.start();
@@ -125,7 +252,7 @@ public class TestChildReaper extends BaseClassForTests
             timing.forWaiting().sleepABit();
 
             //Should not have reaped anything at this point since otherLeader 
is still leader
-            Stat    stat = client.checkExists().forPath("/test");
+            Stat stat = client.checkExists().forPath("/test");
             Assert.assertEquals(stat.getNumChildren(), 10);
 
             CloseableUtils.closeQuietly(otherLeader);
@@ -138,7 +265,7 @@ public class TestChildReaper extends BaseClassForTests
         finally
         {
             CloseableUtils.closeQuietly(reaper);
-            if (otherLeader != null && otherLeader.getState() == 
LeaderLatch.State.STARTED)
+            if ( otherLeader != null && otherLeader.getState() == 
LeaderLatch.State.STARTED )
             {
                 CloseableUtils.closeQuietly(otherLeader);
             }
@@ -147,11 +274,11 @@ public class TestChildReaper extends BaseClassForTests
     }
 
     @Test
-    public void     testMultiPath() throws Exception
+    public void testMultiPath() throws Exception
     {
-        Timing                  timing = new Timing();
-        ChildReaper             reaper = null;
-        CuratorFramework        client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        Timing timing = new Timing();
+        ChildReaper reaper = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
         try
         {
             client.start();
@@ -169,7 +296,7 @@ public class TestChildReaper extends BaseClassForTests
 
             timing.forWaiting().sleepABit();
 
-            Stat    stat = client.checkExists().forPath("/test1");
+            Stat stat = client.checkExists().forPath("/test1");
             Assert.assertEquals(stat.getNumChildren(), 0);
             stat = client.checkExists().forPath("/test2");
             Assert.assertEquals(stat.getNumChildren(), 0);
@@ -184,11 +311,11 @@ public class TestChildReaper extends BaseClassForTests
     }
 
     @Test
-    public void     testNamespace() throws Exception
+    public void testNamespace() throws Exception
     {
-        Timing                  timing = new Timing();
-        ChildReaper             reaper = null;
-        CuratorFramework        client = CuratorFrameworkFactory.builder()
+        Timing timing = new Timing();
+        ChildReaper reaper = null;
+        CuratorFramework client = CuratorFrameworkFactory.builder()
             .connectString(server.getConnectString())
             .sessionTimeoutMs(timing.session())
             .connectionTimeoutMs(timing.connection())
@@ -209,7 +336,7 @@ public class TestChildReaper extends BaseClassForTests
 
             timing.forWaiting().sleepABit();
 
-            Stat    stat = client.checkExists().forPath("/test");
+            Stat stat = client.checkExists().forPath("/test");
             Assert.assertEquals(stat.getNumChildren(), 0);
 
             stat = 
client.usingNamespace(null).checkExists().forPath("/foo/test");

Reply via email to