Repository: curator Updated Branches: refs/heads/master ac2b903d9 -> b9fb9ffd5
CURATOR-145: TreeCache should implement maxDepth Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d589063e Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d589063e Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d589063e Branch: refs/heads/master Commit: d589063ecd4d64f07244e9e5264b1e36976faa50 Parents: ac2b903 Author: Scott Blum <sco...@squareup.com> Authored: Fri Aug 22 16:34:29 2014 -0400 Committer: Scott Blum <dragonsi...@apache.org> Committed: Thu Oct 16 17:03:28 2014 -0400 ---------------------------------------------------------------------- .../framework/recipes/cache/TreeCache.java | 40 +++++++++--- .../recipes/cache/BaseTestTreeCache.java | 1 + .../framework/recipes/cache/TestTreeCache.java | 66 ++++++++++++++++++++ .../recipes/cache/TestTreeCacheRandomTree.java | 59 ++++++++++++++--- 4 files changed, 149 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/d589063e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index 7d51cd8..db6ba01 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -77,6 +77,7 @@ public class TreeCache implements Closeable private boolean cacheData = true; private boolean dataIsCompressed = false; private CloseableExecutorService executorService = null; + private int maxDepth = Integer.MAX_VALUE; private Builder(CuratorFramework client, String path) { @@ -94,7 +95,7 @@ public class TreeCache implements Closeable { executor = new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory)); } - return new TreeCache(client, path, cacheData, dataIsCompressed, executor); + return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor); } /** @@ -146,6 +147,17 @@ public class TreeCache implements Closeable this.executorService = checkNotNull(executorService); return this; } + + /** + * Sets the maximum depth to explore/watch. A {@code maxDepth} of {@code 0} will watch only + * the root node; a {@code maxDepth} of {@code 1} will watch the root node and its immediate + * children. Default: {@code Integer.MAX_VALUE} + */ + public Builder setMaxDepth(int maxDepth) + { + this.maxDepth = maxDepth; + return this; + } } /** @@ -179,24 +191,34 @@ public class TreeCache implements Closeable final AtomicReference<Stat> stat = new AtomicReference<Stat>(); final AtomicReference<byte[]> data = new AtomicReference<byte[]>(); final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>(); + final int depth; TreeNode(String path, TreeNode parent) { this.path = path; this.parent = parent; + this.depth = parent == null ? 0 : parent.depth + 1; } private void refresh() throws Exception { - outstandingOps.addAndGet(2); - doRefreshData(); - doRefreshChildren(); + if (depth < maxDepth) + { + outstandingOps.addAndGet(2); + doRefreshData(); + doRefreshChildren(); + } else { + refreshData(); + } } private void refreshChildren() throws Exception { - outstandingOps.incrementAndGet(); - doRefreshChildren(); + if (depth < maxDepth) + { + outstandingOps.incrementAndGet(); + doRefreshChildren(); + } } private void refreshData() throws Exception @@ -446,6 +468,7 @@ public class TreeCache implements Closeable private final CloseableExecutorService executorService; private final boolean cacheData; private final boolean dataIsCompressed; + private final int maxDepth; private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>(); private final ListenerContainer<UnhandledErrorListener> errorListeners = new ListenerContainer<UnhandledErrorListener>(); private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT); @@ -476,7 +499,7 @@ public class TreeCache implements Closeable */ public TreeCache(CuratorFramework client, String path) { - this(client, path, true, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); + this(client, path, true, false, Integer.MAX_VALUE, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); } /** @@ -486,12 +509,13 @@ public class TreeCache implements Closeable * @param dataIsCompressed if true, data in the path is compressed * @param executorService Closeable ExecutorService to use for the TreeCache's background thread */ - TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) + TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final CloseableExecutorService executorService) { this.root = new TreeNode(validatePath(path), null); this.client = client; this.cacheData = cacheData; this.dataIsCompressed = dataIsCompressed; + this.maxDepth = maxDepth; this.executorService = executorService; } http://git-wip-us.apache.org/repos/asf/curator/blob/d589063e/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java index 69d3c34..ab37785 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.AfterMethod; http://git-wip-us.apache.org/repos/asf/curator/blob/d589063e/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java index 4f3d914..497e997 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java @@ -68,6 +68,53 @@ public class TestTreeCache extends BaseTestTreeCache } @Test + public void testDepth0() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/1", "one".getBytes()); + client.create().forPath("/test/2", "two".getBytes()); + client.create().forPath("/test/3", "three".getBytes()); + client.create().forPath("/test/2/sub", "two-sub".getBytes()); + + cache = buildWithListeners(TreeCache.newBuilder(client, "/test").setMaxDepth(0)); + cache.start(); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + assertNoMoreEvents(); + + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of()); + Assert.assertNull(cache.getCurrentData("/test/1")); + Assert.assertNull(cache.getCurrentChildren("/test/1")); + Assert.assertNull(cache.getCurrentData("/test/non_exist")); + } + + @Test + public void testDepth1() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/1", "one".getBytes()); + client.create().forPath("/test/2", "two".getBytes()); + client.create().forPath("/test/3", "three".getBytes()); + client.create().forPath("/test/2/sub", "two-sub".getBytes()); + + cache = buildWithListeners(TreeCache.newBuilder(client, "/test").setMaxDepth(1)); + cache.start(); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/1", "one".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/2", "two".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/3", "three".getBytes()); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + assertNoMoreEvents(); + + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("1", "2", "3")); + Assert.assertEquals(cache.getCurrentChildren("/test/1").keySet(), ImmutableSet.of()); + Assert.assertEquals(cache.getCurrentChildren("/test/2").keySet(), ImmutableSet.of()); + Assert.assertNull(cache.getCurrentData("/test/2/sub")); + Assert.assertNull(cache.getCurrentChildren("/test/2/sub")); + Assert.assertNull(cache.getCurrentChildren("/test/non_exist")); + } + + @Test public void testAsyncInitialPopulation() throws Exception { client.create().forPath("/test"); @@ -102,6 +149,25 @@ public class TestTreeCache extends BaseTestTreeCache } @Test + public void testFromRootWithDepth() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/one", "hey there".getBytes()); + + cache = buildWithListeners(TreeCache.newBuilder(client, "/").setMaxDepth(1)); + cache.start(); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + assertNoMoreEvents(); + + Assert.assertTrue(cache.getCurrentChildren("/").keySet().contains("test")); + Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of()); + Assert.assertNull(cache.getCurrentData("/test/one")); + Assert.assertNull(cache.getCurrentChildren("/test/one")); + } + + @Test public void testWithNamespace() throws Exception { client.create().forPath("/outer"); http://git-wip-us.apache.org/repos/asf/curator/blob/d589063e/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java index c4501af..96ce75c 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java @@ -50,20 +50,39 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache // These constants will produce a tree about 10 levels deep. private static final int ITERATIONS = 1000; private static final double DIVE_CHANCE = 0.9; + private static final int TEST_DEPTH = 5; private final Random random = new Random(); + private boolean withDepth = false; + + @Test + public void testGiantRandomDeepTree() throws Exception { + doTestGiantRandomDeepTree(); + } + + @Test + public void testGiantRandomDeepTreeWithDepth() throws Exception { + withDepth = true; + doTestGiantRandomDeepTree(); + } /** * Randomly construct a large tree of test data in memory, mirror it into ZK, and then use * a TreeCache to follow the changes. At each step, assert that TreeCache matches our * source-of-truth test data, and that we see exactly the set of events we expect to see. */ - @Test - public void testGiantRandomDeepTree() throws Exception + private void doTestGiantRandomDeepTree() throws Exception { client.create().forPath("/tree", null); CuratorFramework cl = client.usingNamespace("tree"); - cache = newTreeCacheWithListeners(cl, "/"); + if ( withDepth ) + { + cache = buildWithListeners(TreeCache.newBuilder(cl, "/").setMaxDepth(TEST_DEPTH)); + } + else + { + cache = newTreeCacheWithListeners(cl, "/"); + } cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/"); assertEvent(TreeCacheEvent.Type.INITIALIZED); @@ -104,7 +123,10 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache cl.delete().forPath(node.fullPath); // TreeCache should see the delete. - assertEvent(TreeCacheEvent.Type.NODE_REMOVED, node.fullPath); + if (shouldSeeEventAt(node.fullPath)) + { + assertEvent(TreeCacheEvent.Type.NODE_REMOVED, node.fullPath); + } ++removals; } break; @@ -126,7 +148,10 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache cl.setData().forPath(node.fullPath, node.data); // TreeCache should see the update. - assertEvent(TreeCacheEvent.Type.NODE_UPDATED, node.fullPath, node.data); + if (shouldSeeEventAt(node.fullPath)) + { + assertEvent(TreeCacheEvent.Type.NODE_UPDATED, node.fullPath, node.data); + } ++updates; break; @@ -149,7 +174,10 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache cl.create().forPath(child.fullPath, child.data); // TreeCache should see the add. - assertEvent(TreeCacheEvent.Type.NODE_ADDED, child.fullPath, child.data); + if (shouldSeeEventAt(child.fullPath)) + { + assertEvent(TreeCacheEvent.Type.NODE_ADDED, child.fullPath, child.data); + } ++adds; break; @@ -157,7 +185,7 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache // Each iteration, ensure the cached state matches our source-of-truth tree. assertNodeEquals(cache.getCurrentData("/"), root); - assertTreeEquals(cache, root); + assertTreeEquals(cache, root, 0); } // Typical stats for this test: maxDepth: 10, adds: 349, removals: 198, updates: 320 @@ -167,13 +195,26 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache } /** + * Returns true we should see an event at this path based on maxDepth, false otherwise. + */ + private boolean shouldSeeEventAt(String fullPath) + { + return !withDepth || ZKPaths.split(fullPath).size() <= TEST_DEPTH; + } + + /** * Recursively assert that current children equal expected children. */ - private static void assertTreeEquals(TreeCache cache, TestNode expectedNode) + private void assertTreeEquals(TreeCache cache, TestNode expectedNode, int depth) { String path = expectedNode.fullPath; Map<String, ChildData> cacheChildren = cache.getCurrentChildren(path); Assert.assertNotNull(cacheChildren, path); + + if (withDepth && depth == TEST_DEPTH) { + return; + } + Assert.assertEquals(cacheChildren.keySet(), expectedNode.children.keySet(), path); for ( Map.Entry<String, TestNode> entry : expectedNode.children.entrySet() ) @@ -182,7 +223,7 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache ChildData childData = cacheChildren.get(nodeName); TestNode expectedChild = entry.getValue(); assertNodeEquals(childData, expectedChild); - assertTreeEquals(cache, expectedChild); + assertTreeEquals(cache, expectedChild, depth + 1); } }