Repository: curator Updated Branches: refs/heads/CURATOR-33 [created] f4743336e
CURATOR-33 recursive TreeCache recipe Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/470d7e73 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/470d7e73 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/470d7e73 Branch: refs/heads/CURATOR-33 Commit: 470d7e73e1a491f255dd034f6ee9229b78710557 Parents: f5767c8 Author: Scott Blum <sco...@squareup.com> Authored: Mon Jul 14 20:38:07 2014 -0400 Committer: Scott Blum <sco...@squareup.com> Committed: Thu Jul 31 18:04:58 2014 -0400 ---------------------------------------------------------------------- .../java/org/apache/curator/utils/ZKPaths.java | 16 + .../org/apache/curator/utils/TestZKPaths.java | 11 + .../framework/recipes/cache/TreeCache.java | 605 +++++++++++++++++++ .../framework/recipes/cache/TreeCacheEvent.java | 126 ++++ .../recipes/cache/TreeCacheListener.java | 37 ++ .../framework/recipes/cache/TestTreeCache.java | 420 +++++++++++++ 6 files changed, 1215 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java b/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java index 820f45f..352bfd6 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java +++ b/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java @@ -19,6 +19,7 @@ package org.apache.curator.utils; +import com.google.common.base.Splitter; import com.google.common.collect.Lists; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -115,6 +116,21 @@ public class ZKPaths return new PathAndNode(parentPath, node); } + private static final Splitter PATH_SPLITTER = Splitter.on('/').omitEmptyStrings(); + + /** + * Given a full path, return the the individual parts, without slashes. + * The root path will return an empty list. + * + * @param path the path + * @return an array of parts + */ + public static List<String> split(String path) + { + PathUtils.validatePath(path); + return PATH_SPLITTER.splitToList(path); + } + /** * Make sure all the nodes in the path are created. NOTE: Unlike File.mkdirs(), Zookeeper doesn't distinguish * between directories and files. So, every node in the path is created. The data for each node is an empty blob http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-client/src/test/java/org/apache/curator/utils/TestZKPaths.java ---------------------------------------------------------------------- diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestZKPaths.java b/curator-client/src/test/java/org/apache/curator/utils/TestZKPaths.java index fb49d8b..04d07c5 100644 --- a/curator-client/src/test/java/org/apache/curator/utils/TestZKPaths.java +++ b/curator-client/src/test/java/org/apache/curator/utils/TestZKPaths.java @@ -21,6 +21,8 @@ package org.apache.curator.utils; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.Arrays; +import java.util.Collections; public class TestZKPaths { @@ -62,4 +64,13 @@ public class TestZKPaths Assert.assertEquals(ZKPaths.makePath("foo", "/bar"), "/foo/bar"); Assert.assertEquals(ZKPaths.makePath("/foo", "/bar"), "/foo/bar"); } + + @Test + public void testSplit() + { + Assert.assertEquals(ZKPaths.split("/"), Collections.emptyList()); + Assert.assertEquals(ZKPaths.split("/test"), Collections.singletonList("test")); + Assert.assertEquals(ZKPaths.split("/test/one"), Arrays.asList("test", "one")); + Assert.assertEquals(ZKPaths.split("/test/one/two"), Arrays.asList("test", "one", "two")); + } } http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java new file mode 100644 index 0000000..4781253 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -0,0 +1,605 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Maps; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.CloseableExecutorService; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * <p>A utility that attempts to keep all data from all children of a ZK path locally cached. This class + * will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can + * register a listener that will get notified when changes occur.</p> + * <p></p> + * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must + * be prepared for false-positives and false-negatives. Additionally, always use the version number + * when updating data to avoid overwriting another process' change.</p> + */ +public class TreeCache implements Closeable +{ + private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class); + + private enum NodeState + { + PENDING, LIVE, DEAD + } + + final class TreeNode implements Watcher, BackgroundCallback + { + private final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING); + private final String path; + private final TreeNode parent; + private final AtomicReference<Stat> stat = new AtomicReference<Stat>(); + private final AtomicReference<byte[]> data = new AtomicReference<byte[]>(); + private final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>(); + + TreeNode(String path, TreeNode parent) + { + this.path = path; + this.parent = parent; + } + + private void refreshChildren() throws Exception + { + outstandingOps.incrementAndGet(); + client.getChildren().usingWatcher(this).inBackground(this).forPath(path); + } + + private void refreshData() throws Exception + { + outstandingOps.incrementAndGet(); + if ( dataIsCompressed ) + { + client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path); + } + else + { + client.getData().usingWatcher(this).inBackground(this).forPath(path); + } + } + + private void wasReconnected() throws Exception + { + refreshData(); + refreshChildren(); + ConcurrentMap<String, TreeNode> childMap = children.get(); + if ( childMap != null ) + { + for ( TreeNode child : childMap.values() ) + { + child.wasReconnected(); + } + } + } + + private void wasCreated() throws Exception + { + refreshData(); + refreshChildren(); + } + + private void wasDeleted() throws Exception + { + stat.set(null); + data.set(null); + client.clearWatcherReferences(this); + ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null); + if ( childMap != null ) + { + ArrayList<TreeNode> childCopy = new ArrayList<TreeNode>(childMap.values()); + childMap.clear(); + for ( TreeNode child : childCopy ) + { + child.wasDeleted(); + } + } + + if ( treeState.get() == TreeState.CLOSED ) + { + return; + } + + if ( nodeState.compareAndSet(NodeState.LIVE, NodeState.DEAD) ) + { + publishEvent(TreeCacheEvent.Type.NODE_REMOVED, path); + } + + if ( parent == null ) + { + // Root node; use an exist query to watch for existence. + client.checkExists().usingWatcher(this).inBackground().forPath(path); + } + else + { + // Remove from parent if we're currently a child + ConcurrentMap<String, TreeNode> parentChildMap = parent.children.get(); + if ( parentChildMap != null ) + { + parentChildMap.remove(ZKPaths.getNodeFromPath(path), this); + } + } + } + + @Override + public void process(WatchedEvent event) + { + try + { + switch ( event.getType() ) + { + case NodeCreated: + assert parent == null; + wasCreated(); + break; + case NodeChildrenChanged: + refreshChildren(); + break; + case NodeDataChanged: + refreshData(); + break; + case NodeDeleted: + wasDeleted(); + break; + } + } + catch ( Exception e ) + { + handleException(e); + } + } + + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + switch ( event.getType() ) + { + case EXISTS: + // TODO: should only happen for root node + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + nodeState.compareAndSet(NodeState.DEAD, NodeState.PENDING); + wasCreated(); + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + case CHILDREN: + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + stat.set(event.getStat()); + + if ( event.getChildren().isEmpty() ) + { + break; + } + + ConcurrentMap<String, TreeNode> childMap = children.get(); + if ( childMap == null ) + { + childMap = Maps.newConcurrentMap(); + if ( !children.compareAndSet(null, childMap) ) + { + childMap = children.get(); + } + } + + // Present new children in sorted order for test determinism. + List<String> newChildren = new ArrayList<String>(); + for ( String child : event.getChildren() ) + { + if ( !childMap.containsKey(child) ) + { + newChildren.add(child); + } + } + + Collections.sort(newChildren); + for ( String child : newChildren ) + { + String fullPath = ZKPaths.makePath(path, child); + TreeNode node = new TreeNode(fullPath, this); + if ( childMap.putIfAbsent(child, node) == null ) + { + node.wasCreated(); + } + } + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + case GET_DATA: + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + Stat oldStat = stat.getAndSet(event.getStat()); + if ( cacheData ) + { + data.set(event.getData()); + } + + if ( nodeState.compareAndSet(NodeState.PENDING, NodeState.LIVE) ) + { + publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), event.getStat(), event.getData())); + } + else if ( oldStat.getMzxid() != event.getStat().getMzxid() ) + { + publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(), event.getStat(), event.getData())); + } + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + default: + handleException(new Exception(String.format("Unknown event %s", event))); + } + + if ( outstandingOps.decrementAndGet() == 0 ) + { + if ( treeState.compareAndSet(TreeState.LATENT, TreeState.STARTED) ) + { + publishEvent(TreeCacheEvent.Type.INITIALIZED); + } + } + } + } + + private enum TreeState + { + LATENT, + STARTED, + CLOSED + } + + /** + * Detemines when to publish the initialized event. + */ + private final AtomicLong outstandingOps = new AtomicLong(0); + + private final TreeNode root; + private final CuratorFramework client; + private final CloseableExecutorService executorService; + private final boolean cacheData; + private final boolean dataIsCompressed; + private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>(); + private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT); + + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + handleStateChange(newState); + } + }; + + private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("TreeCache"); + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData) + { + this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param threadFactory factory to use when creating internal threads + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) + { + this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param dataIsCompressed if true, data in the path is compressed + * @param threadFactory factory to use when creating internal threads + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) + { + this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param dataIsCompressed if true, data in the path is compressed + * @param executorService ExecutorService to use for the TreeCache's background thread + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService) + { + this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param dataIsCompressed if true, data in the path is compressed + * @param executorService Closeable ExecutorService to use for the TreeCache's background thread + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) + { + this.root = new TreeNode(path, null); + this.client = client; + this.cacheData = cacheData; + this.dataIsCompressed = dataIsCompressed; + this.executorService = executorService; + } + + /** + * Start the cache. The cache is not started automatically. You must call this method. + * + * @throws Exception errors + */ + public void start() throws Exception + { + client.getConnectionStateListenable().addListener(connectionStateListener); + root.wasCreated(); + } + + /** + * Close/end the cache + * + * @throws java.io.IOException errors + */ + @Override + public void close() throws IOException + { + if ( treeState.compareAndSet(TreeState.STARTED, TreeState.CLOSED) ) + { + client.getConnectionStateListenable().removeListener(connectionStateListener); + listeners.clear(); + executorService.close(); + try + { + root.wasDeleted(); + } + catch ( Exception e ) + { + handleException(e); + } + } + } + + /** + * Return the cache listenable + * + * @return listenable + */ + public ListenerContainer<TreeCacheListener> getListenable() + { + return listeners; + } + + private TreeNode find(String fullPath) + { + if ( !fullPath.startsWith(root.path) ) + { + return null; + } + + TreeNode current = root; + if ( fullPath.length() > root.path.length() ) + { + List<String> split = ZKPaths.split(fullPath.substring(root.path.length())); + for ( String part : split ) + { + ConcurrentMap<String, TreeNode> map = current.children.get(); + if ( map == null ) + { + return null; + } + current = map.get(part); + if ( current == null ) + { + return null; + } + } + } + return current; + } + + /** + * Return the current set of children. There are no guarantees of accuracy. This is + * merely the most recent view of the data. The data is returned in sorted order. If there is + * no child with that path, <code>null</code> is returned. + * + * @param fullPath full path to the node to check + * @return a possibly-empty list of children if the node is alive, or null + */ + public SortedSet<String> getCurrentChildren(String fullPath) + { + TreeNode node = find(fullPath); + if ( node == null || node.nodeState.get() != NodeState.LIVE ) + { + return null; + } + ConcurrentMap<String, TreeNode> map = node.children.get(); + SortedSet<String> result; + if ( map == null ) + { + result = ImmutableSortedSet.of(); + } + else + { + result = ImmutableSortedSet.copyOf(map.keySet()); + } + + // Double-check liveness after retreiving children. + return node.nodeState.get() == NodeState.LIVE ? result : null; + } + + /** + * Return the current data for the given path. There are no guarantees of accuracy. This is + * merely the most recent view of the data. If there is no child with that path, + * <code>null</code> is returned. + * + * @param fullPath full path to the node to check + * @return data if the node is alive, or null + */ + public ChildData getCurrentData(String fullPath) + { + TreeNode node = find(fullPath); + if ( node == null || node.nodeState.get() != NodeState.LIVE ) + { + return null; + } + ChildData result = new ChildData(node.path, node.stat.get(), node.data.get()); + // Double-check liveness after retreiving stat / data. + return node.nodeState.get() == NodeState.LIVE ? result : null; + } + + void callListeners(final TreeCacheEvent event) + { + listeners.forEach(new Function<TreeCacheListener, Void>() + { + @Override + public Void apply(TreeCacheListener listener) + { + try + { + listener.childEvent(client, event); + } + catch ( Exception e ) + { + handleException(e); + } + return null; + } + } + ); + } + + /** + * Default behavior is just to log the exception + * + * @param e the exception + */ + protected void handleException(Throwable e) + { + LOG.error("", e); + } + + private void handleStateChange(ConnectionState newState) + { + switch ( newState ) + { + case SUSPENDED: + publishEvent(TreeCacheEvent.Type.CONNECTION_SUSPENDED); + break; + + case LOST: + publishEvent(TreeCacheEvent.Type.CONNECTION_LOST); + break; + + case RECONNECTED: + try + { + root.wasReconnected(); + publishEvent(TreeCacheEvent.Type.CONNECTION_RECONNECTED); + } + catch ( Exception e ) + { + handleException(e); + } + break; + } + } + + private void publishEvent(TreeCacheEvent.Type type) + { + publishEvent(new TreeCacheEvent(type, null)); + } + + private void publishEvent(TreeCacheEvent.Type type, String path) + { + publishEvent(new TreeCacheEvent(type, new ChildData(path, null, null))); + } + + private void publishEvent(TreeCacheEvent.Type type, ChildData data) + { + publishEvent(new TreeCacheEvent(type, data)); + } + + private void publishEvent(final TreeCacheEvent event) + { + if ( treeState.get() != TreeState.CLOSED ) + { + executorService.submit(new Runnable() + { + @Override + public void run() + { + { + try + { + callListeners(event); + } + catch ( Exception e ) + { + handleException(e); + } + } + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java new file mode 100644 index 0000000..2080d26 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +/** + * POJO that abstracts a change to a path + */ +public class TreeCacheEvent +{ + private final Type type; + private final ChildData data; + + /** + * Type of change + */ + public enum Type + { + /** + * A node was added. + */ + NODE_ADDED, + + /** + * A node's data was changed + */ + NODE_UPDATED, + + /** + * A node was removed from the tree + */ + NODE_REMOVED, + + /** + * Called when the connection has changed to {@link org.apache.curator.framework.state.ConnectionState#SUSPENDED} + * <p/> + * This is exposed so that users of the class can be notified of issues that *might* affect normal operation. + * The TreeCache is written such that listeners are not expected to do anything special on this + * event, except for those people who want to cause some application-specific logic to fire when this occurs. + * While the connection is down, the TreeCache will continue to have its state from before it lost + * the connection and after the connection is restored, the TreeCache will emit normal child events + * for all of the adds, deletes and updates that happened during the time that it was disconnected. + */ + CONNECTION_SUSPENDED, + + /** + * Called when the connection has changed to {@link org.apache.curator.framework.state.ConnectionState#RECONNECTED} + * <p/> + * This is exposed so that users of the class can be notified of issues that *might* affect normal operation. + * The TreeCache is written such that listeners are not expected to do anything special on this + * event, except for those people who want to cause some application-specific logic to fire when this occurs. + * While the connection is down, the TreeCache will continue to have its state from before it lost + * the connection and after the connection is restored, the TreeCache will emit normal child events + * for all of the adds, deletes and updates that happened during the time that it was disconnected. + */ + CONNECTION_RECONNECTED, + + /** + * Called when the connection has changed to {@link org.apache.curator.framework.state.ConnectionState#LOST} + * <p/> + * This is exposed so that users of the class can be notified of issues that *might* affect normal operation. + * The TreeCache is written such that listeners are not expected to do anything special on this + * event, except for those people who want to cause some application-specific logic to fire when this occurs. + * While the connection is down, the TreeCache will continue to have its state from before it lost + * the connection and after the connection is restored, the TreeCache will emit normal child events + * for all of the adds, deletes and updates that happened during the time that it was disconnected. + */ + CONNECTION_LOST, + + /** + * Posted when the initial cache has been populated. + */ + INITIALIZED + } + + /** + * @param type event type + * @param data event data or null + */ + public TreeCacheEvent(Type type, ChildData data) + { + this.type = type; + this.data = data; + } + + /** + * @return change type + */ + public Type getType() + { + return type; + } + + /** + * @return the node's data + */ + public ChildData getData() + { + return data; + } + + @Override + public String toString() + { + return TreeCacheEvent.class.getSimpleName() + "{" + + "type=" + type + + ", data=" + data + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListener.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListener.java new file mode 100644 index 0000000..6ea342d --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListener.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; + +/** + * Listener for {@link TreeCache} changes + */ +public interface TreeCacheListener +{ + /** + * Called when a change has occurred + * + * @param client the client + * @param event describes the change + * @throws Exception errors + */ + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception; +} http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java new file mode 100644 index 0000000..2922d40 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import com.google.common.collect.ImmutableSortedSet; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.KillSession; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.CreateMode; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +public class TestTreeCache extends BaseClassForTests +{ + private final Timing timing = new Timing(); + private CuratorFramework client; + private TreeCache cache; + private List<Throwable> exceptions; + private BlockingQueue<TreeCacheEvent> events; + private TreeCacheListener eventListener; + + /** + * A TreeCache that records exceptions. + */ + private TreeCache newTreeCache(String path, boolean cacheData) + { + return new TreeCache(client, path, cacheData) + { + @Override + protected void handleException(Throwable e) + { + exceptions.add(e); + } + }; + } + + @Override + @BeforeMethod + public void setup() throws Exception + { + super.setup(); + + exceptions = new ArrayList<Throwable>(); + events = new LinkedBlockingQueue<TreeCacheEvent>(); + eventListener = new TreeCacheListener() + { + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception + { + events.add(event); + } + }; + + client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() + { + @Override + public void unhandledError(String message, Throwable e) + { + exceptions.add(e); + } + } + ); + cache = newTreeCache("/test", true); + cache.getListenable().addListener(eventListener); + } + + @Override + @AfterMethod + public void teardown() throws Exception + { + try + { + try + { + for ( Throwable exception : exceptions ) + { + Assert.fail("Exception was thrown", exception); + } + } + finally + { + CloseableUtils.closeQuietly(cache); + CloseableUtils.closeQuietly(client); + } + } + finally + { + super.teardown(); + } + } + + @Test + public void testStartup() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/1", "one".getBytes()); + client.create().forPath("/test/2", "two".getBytes()); + client.create().forPath("/test/3", "three".getBytes()); + client.create().forPath("/test/2/sub", "two-sub".getBytes()); + + cache.start(); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/1", "one".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/2", "two".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/3", "three".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/2/sub", "two-sub".getBytes()); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + assertNoMoreEvents(); + + Assert.assertEquals(cache.getCurrentChildren("/test"), ImmutableSortedSet.of("1", "2", "3")); + Assert.assertEquals(cache.getCurrentChildren("/test/1"), ImmutableSortedSet.of()); + Assert.assertEquals(cache.getCurrentChildren("/test/2"), ImmutableSortedSet.of("sub")); + Assert.assertNull(cache.getCurrentChildren("/test/non_exist")); + } + + @Test + public void testStartEmpty() throws Exception + { + cache.start(); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + + client.create().forPath("/test"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertNoMoreEvents(); + } + + @Test + public void testAsyncInitialPopulation() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/one", "hey there".getBytes()); + cache.start(); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + assertNoMoreEvents(); + } + + @Test + public void testSyncInitialPopulation() throws Exception + { + cache.start(); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + client.create().forPath("/test"); + client.create().forPath("/test/one", "hey there".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); + assertNoMoreEvents(); + } + + @Test + public void testChildrenInitialized() throws Exception + { + client.create().forPath("/test", "".getBytes()); + client.create().forPath("/test/1", "1".getBytes()); + client.create().forPath("/test/2", "2".getBytes()); + client.create().forPath("/test/3", "3".getBytes()); + + cache.start(); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/1"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/2"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/3"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + assertNoMoreEvents(); + } + + @Test + public void testUpdateWhenNotCachingData() throws Exception + { + cache = newTreeCache("/test", false); + cache.getListenable().addListener(eventListener); + + client.create().forPath("/test"); + cache.start(); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + + client.create().forPath("/test/foo", "first".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); + + client.setData().forPath("/test/foo", "something new".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/foo"); + assertNoMoreEvents(); + } + + @Test + public void testDeleteThenCreate() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/foo", "one".getBytes()); + cache.start(); + + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + + client.delete().forPath("/test/foo"); + assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/foo"); + client.create().forPath("/test/foo", "two".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); + + assertNoMoreEvents(); + } + + // see https://github.com/Netflix/curator/issues/27 - was caused by not comparing old->new data + @Test + public void testIssue27() throws Exception + { + client.create().forPath("/test"); + client.create().forPath("/test/a"); + client.create().forPath("/test/b"); + client.create().forPath("/test/c"); + + client.getChildren().forPath("/test"); + + cache.start(); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/a"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/b"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/c"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + + client.delete().forPath("/test/a"); + client.create().forPath("/test/a"); + assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/a"); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/a"); + + assertNoMoreEvents(); + } + + @Test + public void testKilledSession() throws Exception + { + client.create().forPath("/test"); + cache.start(); + + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + + client.create().forPath("/test/foo", "foo".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); + client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/me"); + + KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString()); + assertEvent(TreeCacheEvent.Type.CONNECTION_SUSPENDED); + assertEvent(TreeCacheEvent.Type.CONNECTION_LOST); + assertEvent(TreeCacheEvent.Type.CONNECTION_RECONNECTED); + assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me"); + + assertNoMoreEvents(); + } + + @Test + public void testBasics() throws Exception + { + client.create().forPath("/test"); + cache.start(); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + Assert.assertEquals(cache.getCurrentChildren("/test"), ImmutableSortedSet.of()); + + client.create().forPath("/test/one", "hey there".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); + Assert.assertEquals(cache.getCurrentChildren("/test"), ImmutableSortedSet.of("one")); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); + + client.setData().forPath("/test/one", "sup!".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one"); + Assert.assertEquals(cache.getCurrentChildren("/test"), ImmutableSortedSet.of("one")); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!"); + + client.delete().forPath("/test/one"); + assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/one"); + Assert.assertEquals(cache.getCurrentChildren("/test"), ImmutableSortedSet.of()); + + assertNoMoreEvents(); + } + + @Test + public void testBasicsOnTwoCaches() throws Exception + { + TreeCache cache2 = newTreeCache("/test", true); + + // Just ensures the same event count; enables test flow control on cache2. + final Semaphore semaphore = new Semaphore(0); + cache2.getListenable().addListener(new TreeCacheListener() + { + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception + { + semaphore.release(); + } + }); + + try + { + client.create().forPath("/test"); + cache.start(); + cache2.start(); + + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + semaphore.acquire(2); + + client.create().forPath("/test/one", "hey there".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); + semaphore.acquire(); + Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "hey there"); + + client.setData().forPath("/test/one", "sup!".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one"); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!"); + semaphore.acquire(); + Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!"); + + client.delete().forPath("/test/one"); + assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/one"); + Assert.assertNull(cache.getCurrentData("/test/one")); + semaphore.acquire(); + Assert.assertNull(cache2.getCurrentData("/test/one")); + + assertNoMoreEvents(); + Assert.assertEquals(semaphore.availablePermits(), 0); + } + finally + { + CloseableUtils.closeQuietly(cache2); + } + } + + @Test + public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception + { + client.create().forPath("/test"); + + cache.start(); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); + assertEvent(TreeCacheEvent.Type.INITIALIZED); + + client.create().forPath("/test/one", "hey there".getBytes()); + assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); + + cache.close(); + assertNoMoreEvents(); + + client.delete().forPath("/test/one"); + assertNoMoreEvents(); + } + + private void assertNoMoreEvents() throws InterruptedException + { + timing.sleepABit(); + Assert.assertTrue(events.isEmpty()); + } + + private TreeCacheEvent assertEvent(TreeCacheEvent.Type expectedType) throws InterruptedException + { + return assertEvent(expectedType, null); + } + + private TreeCacheEvent assertEvent(TreeCacheEvent.Type expectedType, String expectedPath) throws InterruptedException + { + return assertEvent(expectedType, expectedPath, null); + } + + private TreeCacheEvent assertEvent(TreeCacheEvent.Type expectedType, String expectedPath, byte[] expectedData) throws InterruptedException + { + TreeCacheEvent event = events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS); + Assert.assertEquals(event.getType(), expectedType); + if ( expectedPath == null ) + { + Assert.assertNull(event.getData()); + } + else + { + Assert.assertEquals(event.getData().getPath(), expectedPath); + } + if ( expectedData != null ) + { + Assert.assertEquals(event.getData().getData(), expectedData); + } + return event; + } +}