Repository: curator
Updated Branches:
  refs/heads/CURATOR-33 [created] f4743336e


CURATOR-33 recursive TreeCache recipe


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/470d7e73
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/470d7e73
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/470d7e73

Branch: refs/heads/CURATOR-33
Commit: 470d7e73e1a491f255dd034f6ee9229b78710557
Parents: f5767c8
Author: Scott Blum <sco...@squareup.com>
Authored: Mon Jul 14 20:38:07 2014 -0400
Committer: Scott Blum <sco...@squareup.com>
Committed: Thu Jul 31 18:04:58 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/curator/utils/ZKPaths.java  |  16 +
 .../org/apache/curator/utils/TestZKPaths.java   |  11 +
 .../framework/recipes/cache/TreeCache.java      | 605 +++++++++++++++++++
 .../framework/recipes/cache/TreeCacheEvent.java | 126 ++++
 .../recipes/cache/TreeCacheListener.java        |  37 ++
 .../framework/recipes/cache/TestTreeCache.java  | 420 +++++++++++++
 6 files changed, 1215 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java 
b/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java
index 820f45f..352bfd6 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.utils;
 
+import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -115,6 +116,21 @@ public class ZKPaths
         return new PathAndNode(parentPath, node);
     }
 
+    private static final Splitter PATH_SPLITTER = 
Splitter.on('/').omitEmptyStrings();
+
+    /**
+     * Given a full path, return the the individual parts, without slashes.
+     * The root path will return an empty list.
+     *
+     * @param path the path
+     * @return an array of parts
+     */
+    public static List<String> split(String path)
+    {
+        PathUtils.validatePath(path);
+        return PATH_SPLITTER.splitToList(path);
+    }
+
     /**
      * Make sure all the nodes in the path are created. NOTE: Unlike 
File.mkdirs(), Zookeeper doesn't distinguish
      * between directories and files. So, every node in the path is created. 
The data for each node is an empty blob

http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-client/src/test/java/org/apache/curator/utils/TestZKPaths.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/test/java/org/apache/curator/utils/TestZKPaths.java 
b/curator-client/src/test/java/org/apache/curator/utils/TestZKPaths.java
index fb49d8b..04d07c5 100644
--- a/curator-client/src/test/java/org/apache/curator/utils/TestZKPaths.java
+++ b/curator-client/src/test/java/org/apache/curator/utils/TestZKPaths.java
@@ -21,6 +21,8 @@ package org.apache.curator.utils;
 
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.Arrays;
+import java.util.Collections;
 
 public class TestZKPaths
 {
@@ -62,4 +64,13 @@ public class TestZKPaths
         Assert.assertEquals(ZKPaths.makePath("foo", "/bar"), "/foo/bar");
         Assert.assertEquals(ZKPaths.makePath("/foo", "/bar"), "/foo/bar");
     }
+
+    @Test
+    public void testSplit()
+    {
+        Assert.assertEquals(ZKPaths.split("/"), Collections.emptyList());
+        Assert.assertEquals(ZKPaths.split("/test"), 
Collections.singletonList("test"));
+        Assert.assertEquals(ZKPaths.split("/test/one"), Arrays.asList("test", 
"one"));
+        Assert.assertEquals(ZKPaths.split("/test/one/two"), 
Arrays.asList("test", "one", "two"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
new file mode 100644
index 0000000..4781253
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Maps;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * <p>A utility that attempts to keep all data from all children of a ZK path 
locally cached. This class
+ * will watch the ZK path, respond to update/create/delete events, pull down 
the data, etc. You can
+ * register a listener that will get notified when changes occur.</p>
+ * <p></p>
+ * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. 
Users of this class must
+ * be prepared for false-positives and false-negatives. Additionally, always 
use the version number
+ * when updating data to avoid overwriting another process' change.</p>
+ */
+public class TreeCache implements Closeable
+{
+    private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
+
+    private enum NodeState
+    {
+        PENDING, LIVE, DEAD
+    }
+
+    final class TreeNode implements Watcher, BackgroundCallback
+    {
+        private final AtomicReference<NodeState> nodeState = new 
AtomicReference<NodeState>(NodeState.PENDING);
+        private final String path;
+        private final TreeNode parent;
+        private final AtomicReference<Stat> stat = new AtomicReference<Stat>();
+        private final AtomicReference<byte[]> data = new 
AtomicReference<byte[]>();
+        private final AtomicReference<ConcurrentMap<String, TreeNode>> 
children = new AtomicReference<ConcurrentMap<String, TreeNode>>();
+
+        TreeNode(String path, TreeNode parent)
+        {
+            this.path = path;
+            this.parent = parent;
+        }
+
+        private void refreshChildren() throws Exception
+        {
+            outstandingOps.incrementAndGet();
+            
client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
+        }
+
+        private void refreshData() throws Exception
+        {
+            outstandingOps.incrementAndGet();
+            if ( dataIsCompressed )
+            {
+                
client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
+            }
+            else
+            {
+                
client.getData().usingWatcher(this).inBackground(this).forPath(path);
+            }
+        }
+
+        private void wasReconnected() throws Exception
+        {
+            refreshData();
+            refreshChildren();
+            ConcurrentMap<String, TreeNode> childMap = children.get();
+            if ( childMap != null )
+            {
+                for ( TreeNode child : childMap.values() )
+                {
+                    child.wasReconnected();
+                }
+            }
+        }
+
+        private void wasCreated() throws Exception
+        {
+            refreshData();
+            refreshChildren();
+        }
+
+        private void wasDeleted() throws Exception
+        {
+            stat.set(null);
+            data.set(null);
+            client.clearWatcherReferences(this);
+            ConcurrentMap<String, TreeNode> childMap = 
children.getAndSet(null);
+            if ( childMap != null )
+            {
+                ArrayList<TreeNode> childCopy = new 
ArrayList<TreeNode>(childMap.values());
+                childMap.clear();
+                for ( TreeNode child : childCopy )
+                {
+                    child.wasDeleted();
+                }
+            }
+
+            if ( treeState.get() == TreeState.CLOSED )
+            {
+                return;
+            }
+
+            if ( nodeState.compareAndSet(NodeState.LIVE, NodeState.DEAD) )
+            {
+                publishEvent(TreeCacheEvent.Type.NODE_REMOVED, path);
+            }
+
+            if ( parent == null )
+            {
+                // Root node; use an exist query to watch for existence.
+                
client.checkExists().usingWatcher(this).inBackground().forPath(path);
+            }
+            else
+            {
+                // Remove from parent if we're currently a child
+                ConcurrentMap<String, TreeNode> parentChildMap = 
parent.children.get();
+                if ( parentChildMap != null )
+                {
+                    parentChildMap.remove(ZKPaths.getNodeFromPath(path), this);
+                }
+            }
+        }
+
+        @Override
+        public void process(WatchedEvent event)
+        {
+            try
+            {
+                switch ( event.getType() )
+                {
+                case NodeCreated:
+                    assert parent == null;
+                    wasCreated();
+                    break;
+                case NodeChildrenChanged:
+                    refreshChildren();
+                    break;
+                case NodeDataChanged:
+                    refreshData();
+                    break;
+                case NodeDeleted:
+                    wasDeleted();
+                    break;
+                }
+            }
+            catch ( Exception e )
+            {
+                handleException(e);
+            }
+        }
+
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event) 
throws Exception
+        {
+            switch ( event.getType() )
+            {
+            case EXISTS:
+                // TODO: should only happen for root node
+                if ( event.getResultCode() == 
KeeperException.Code.OK.intValue() )
+                {
+                    nodeState.compareAndSet(NodeState.DEAD, NodeState.PENDING);
+                    wasCreated();
+                }
+                else if ( event.getResultCode() == 
KeeperException.Code.NONODE.intValue() )
+                {
+                    wasDeleted();
+                }
+                break;
+            case CHILDREN:
+                if ( event.getResultCode() == 
KeeperException.Code.OK.intValue() )
+                {
+                    stat.set(event.getStat());
+
+                    if ( event.getChildren().isEmpty() )
+                    {
+                        break;
+                    }
+
+                    ConcurrentMap<String, TreeNode> childMap = children.get();
+                    if ( childMap == null )
+                    {
+                        childMap = Maps.newConcurrentMap();
+                        if ( !children.compareAndSet(null, childMap) )
+                        {
+                            childMap = children.get();
+                        }
+                    }
+
+                    // Present new children in sorted order for test 
determinism.
+                    List<String> newChildren = new ArrayList<String>();
+                    for ( String child : event.getChildren() )
+                    {
+                        if ( !childMap.containsKey(child) )
+                        {
+                            newChildren.add(child);
+                        }
+                    }
+
+                    Collections.sort(newChildren);
+                    for ( String child : newChildren )
+                    {
+                        String fullPath = ZKPaths.makePath(path, child);
+                        TreeNode node = new TreeNode(fullPath, this);
+                        if ( childMap.putIfAbsent(child, node) == null )
+                        {
+                            node.wasCreated();
+                        }
+                    }
+                }
+                else if ( event.getResultCode() == 
KeeperException.Code.NONODE.intValue() )
+                {
+                    wasDeleted();
+                }
+                break;
+            case GET_DATA:
+                if ( event.getResultCode() == 
KeeperException.Code.OK.intValue() )
+                {
+                    Stat oldStat = stat.getAndSet(event.getStat());
+                    if ( cacheData )
+                    {
+                        data.set(event.getData());
+                    }
+
+                    if ( nodeState.compareAndSet(NodeState.PENDING, 
NodeState.LIVE) )
+                    {
+                        publishEvent(TreeCacheEvent.Type.NODE_ADDED, new 
ChildData(event.getPath(), event.getStat(), event.getData()));
+                    }
+                    else if ( oldStat.getMzxid() != event.getStat().getMzxid() 
)
+                    {
+                        publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new 
ChildData(event.getPath(), event.getStat(), event.getData()));
+                    }
+                }
+                else if ( event.getResultCode() == 
KeeperException.Code.NONODE.intValue() )
+                {
+                    wasDeleted();
+                }
+                break;
+            default:
+                handleException(new Exception(String.format("Unknown event 
%s", event)));
+            }
+
+            if ( outstandingOps.decrementAndGet() == 0 )
+            {
+                if ( treeState.compareAndSet(TreeState.LATENT, 
TreeState.STARTED) )
+                {
+                    publishEvent(TreeCacheEvent.Type.INITIALIZED);
+                }
+            }
+        }
+    }
+
+    private enum TreeState
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    /**
+     * Detemines when to publish the initialized event.
+     */
+    private final AtomicLong outstandingOps = new AtomicLong(0);
+
+    private final TreeNode root;
+    private final CuratorFramework client;
+    private final CloseableExecutorService executorService;
+    private final boolean cacheData;
+    private final boolean dataIsCompressed;
+    private final ListenerContainer<TreeCacheListener> listeners = new 
ListenerContainer<TreeCacheListener>();
+    private final AtomicReference<TreeState> treeState = new 
AtomicReference<TreeState>(TreeState.LATENT);
+
+    private final ConnectionStateListener connectionStateListener = new 
ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState 
newState)
+        {
+            handleStateChange(newState);
+        }
+    };
+
+    private static final ThreadFactory defaultThreadFactory = 
ThreadUtils.newThreadFactory("TreeCache");
+
+    /**
+     * @param client    the client
+     * @param path      path to watch
+     * @param cacheData if true, node contents are cached in addition to the 
stat
+     */
+    public TreeCache(CuratorFramework client, String path, boolean cacheData)
+    {
+        this(client, path, cacheData, false, new 
CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory),
 true));
+    }
+
+    /**
+     * @param client        the client
+     * @param path          path to watch
+     * @param cacheData     if true, node contents are cached in addition to 
the stat
+     * @param threadFactory factory to use when creating internal threads
+     */
+    public TreeCache(CuratorFramework client, String path, boolean cacheData, 
ThreadFactory threadFactory)
+    {
+        this(client, path, cacheData, false, new 
CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), 
true));
+    }
+
+    /**
+     * @param client           the client
+     * @param path             path to watch
+     * @param cacheData        if true, node contents are cached in addition 
to the stat
+     * @param dataIsCompressed if true, data in the path is compressed
+     * @param threadFactory    factory to use when creating internal threads
+     */
+    public TreeCache(CuratorFramework client, String path, boolean cacheData, 
boolean dataIsCompressed, ThreadFactory threadFactory)
+    {
+        this(client, path, cacheData, dataIsCompressed, new 
CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), 
true));
+    }
+
+    /**
+     * @param client           the client
+     * @param path             path to watch
+     * @param cacheData        if true, node contents are cached in addition 
to the stat
+     * @param dataIsCompressed if true, data in the path is compressed
+     * @param executorService  ExecutorService to use for the TreeCache's 
background thread
+     */
+    public TreeCache(CuratorFramework client, String path, boolean cacheData, 
boolean dataIsCompressed, final ExecutorService executorService)
+    {
+        this(client, path, cacheData, dataIsCompressed, new 
CloseableExecutorService(executorService));
+    }
+
+    /**
+     * @param client           the client
+     * @param path             path to watch
+     * @param cacheData        if true, node contents are cached in addition 
to the stat
+     * @param dataIsCompressed if true, data in the path is compressed
+     * @param executorService  Closeable ExecutorService to use for the 
TreeCache's background thread
+     */
+    public TreeCache(CuratorFramework client, String path, boolean cacheData, 
boolean dataIsCompressed, final CloseableExecutorService executorService)
+    {
+        this.root = new TreeNode(path, null);
+        this.client = client;
+        this.cacheData = cacheData;
+        this.dataIsCompressed = dataIsCompressed;
+        this.executorService = executorService;
+    }
+
+    /**
+     * Start the cache. The cache is not started automatically. You must call 
this method.
+     *
+     * @throws Exception errors
+     */
+    public void start() throws Exception
+    {
+        
client.getConnectionStateListenable().addListener(connectionStateListener);
+        root.wasCreated();
+    }
+
+    /**
+     * Close/end the cache
+     *
+     * @throws java.io.IOException errors
+     */
+    @Override
+    public void close() throws IOException
+    {
+        if ( treeState.compareAndSet(TreeState.STARTED, TreeState.CLOSED) )
+        {
+            
client.getConnectionStateListenable().removeListener(connectionStateListener);
+            listeners.clear();
+            executorService.close();
+            try
+            {
+                root.wasDeleted();
+            }
+            catch ( Exception e )
+            {
+                handleException(e);
+            }
+        }
+    }
+
+    /**
+     * Return the cache listenable
+     *
+     * @return listenable
+     */
+    public ListenerContainer<TreeCacheListener> getListenable()
+    {
+        return listeners;
+    }
+
+    private TreeNode find(String fullPath)
+    {
+        if ( !fullPath.startsWith(root.path) )
+        {
+            return null;
+        }
+
+        TreeNode current = root;
+        if ( fullPath.length() > root.path.length() )
+        {
+            List<String> split = 
ZKPaths.split(fullPath.substring(root.path.length()));
+            for ( String part : split )
+            {
+                ConcurrentMap<String, TreeNode> map = current.children.get();
+                if ( map == null )
+                {
+                    return null;
+                }
+                current = map.get(part);
+                if ( current == null )
+                {
+                    return null;
+                }
+            }
+        }
+        return current;
+    }
+
+    /**
+     * Return the current set of children. There are no guarantees of 
accuracy. This is
+     * merely the most recent view of the data. The data is returned in sorted 
order. If there is
+     * no child with that path, <code>null</code> is returned.
+     *
+     * @param fullPath full path to the node to check
+     * @return a possibly-empty list of children if the node is alive, or null
+     */
+    public SortedSet<String> getCurrentChildren(String fullPath)
+    {
+        TreeNode node = find(fullPath);
+        if ( node == null || node.nodeState.get() != NodeState.LIVE )
+        {
+            return null;
+        }
+        ConcurrentMap<String, TreeNode> map = node.children.get();
+        SortedSet<String> result;
+        if ( map == null )
+        {
+            result = ImmutableSortedSet.of();
+        }
+        else
+        {
+            result = ImmutableSortedSet.copyOf(map.keySet());
+        }
+
+        // Double-check liveness after retreiving children.
+        return node.nodeState.get() == NodeState.LIVE ? result : null;
+    }
+
+    /**
+     * Return the current data for the given path. There are no guarantees of 
accuracy. This is
+     * merely the most recent view of the data. If there is no child with that 
path,
+     * <code>null</code> is returned.
+     *
+     * @param fullPath full path to the node to check
+     * @return data if the node is alive, or null
+     */
+    public ChildData getCurrentData(String fullPath)
+    {
+        TreeNode node = find(fullPath);
+        if ( node == null || node.nodeState.get() != NodeState.LIVE )
+        {
+            return null;
+        }
+        ChildData result = new ChildData(node.path, node.stat.get(), 
node.data.get());
+        // Double-check liveness after retreiving stat / data.
+        return node.nodeState.get() == NodeState.LIVE ? result : null;
+    }
+
+    void callListeners(final TreeCacheEvent event)
+    {
+        listeners.forEach(new Function<TreeCacheListener, Void>()
+                          {
+                              @Override
+                              public Void apply(TreeCacheListener listener)
+                              {
+                                  try
+                                  {
+                                      listener.childEvent(client, event);
+                                  }
+                                  catch ( Exception e )
+                                  {
+                                      handleException(e);
+                                  }
+                                  return null;
+                              }
+                          }
+                         );
+    }
+
+    /**
+     * Default behavior is just to log the exception
+     *
+     * @param e the exception
+     */
+    protected void handleException(Throwable e)
+    {
+        LOG.error("", e);
+    }
+
+    private void handleStateChange(ConnectionState newState)
+    {
+        switch ( newState )
+        {
+        case SUSPENDED:
+            publishEvent(TreeCacheEvent.Type.CONNECTION_SUSPENDED);
+            break;
+
+        case LOST:
+            publishEvent(TreeCacheEvent.Type.CONNECTION_LOST);
+            break;
+
+        case RECONNECTED:
+            try
+            {
+                root.wasReconnected();
+                publishEvent(TreeCacheEvent.Type.CONNECTION_RECONNECTED);
+            }
+            catch ( Exception e )
+            {
+                handleException(e);
+            }
+            break;
+        }
+    }
+
+    private void publishEvent(TreeCacheEvent.Type type)
+    {
+        publishEvent(new TreeCacheEvent(type, null));
+    }
+
+    private void publishEvent(TreeCacheEvent.Type type, String path)
+    {
+        publishEvent(new TreeCacheEvent(type, new ChildData(path, null, 
null)));
+    }
+
+    private void publishEvent(TreeCacheEvent.Type type, ChildData data)
+    {
+        publishEvent(new TreeCacheEvent(type, data));
+    }
+
+    private void publishEvent(final TreeCacheEvent event)
+    {
+        if ( treeState.get() != TreeState.CLOSED )
+        {
+            executorService.submit(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    {
+                        try
+                        {
+                            callListeners(event);
+                        }
+                        catch ( Exception e )
+                        {
+                            handleException(e);
+                        }
+                    }
+                }
+            });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java
new file mode 100644
index 0000000..2080d26
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+/**
+ * POJO that abstracts a change to a path
+ */
+public class TreeCacheEvent
+{
+    private final Type type;
+    private final ChildData data;
+
+    /**
+     * Type of change
+     */
+    public enum Type
+    {
+        /**
+         * A node was added.
+         */
+        NODE_ADDED,
+
+        /**
+         * A node's data was changed
+         */
+        NODE_UPDATED,
+
+        /**
+         * A node was removed from the tree
+         */
+        NODE_REMOVED,
+
+        /**
+         * Called when the connection has changed to {@link 
org.apache.curator.framework.state.ConnectionState#SUSPENDED}
+         * <p/>
+         * This is exposed so that users of the class can be notified of 
issues that *might* affect normal operation.
+         * The TreeCache is written such that listeners are not expected to do 
anything special on this
+         * event, except for those people who want to cause some 
application-specific logic to fire when this occurs.
+         * While the connection is down, the TreeCache will continue to have 
its state from before it lost
+         * the connection and after the connection is restored, the TreeCache 
will emit normal child events
+         * for all of the adds, deletes and updates that happened during the 
time that it was disconnected.
+         */
+        CONNECTION_SUSPENDED,
+
+        /**
+         * Called when the connection has changed to {@link 
org.apache.curator.framework.state.ConnectionState#RECONNECTED}
+         * <p/>
+         * This is exposed so that users of the class can be notified of 
issues that *might* affect normal operation.
+         * The TreeCache is written such that listeners are not expected to do 
anything special on this
+         * event, except for those people who want to cause some 
application-specific logic to fire when this occurs.
+         * While the connection is down, the TreeCache will continue to have 
its state from before it lost
+         * the connection and after the connection is restored, the TreeCache 
will emit normal child events
+         * for all of the adds, deletes and updates that happened during the 
time that it was disconnected.
+         */
+        CONNECTION_RECONNECTED,
+
+        /**
+         * Called when the connection has changed to {@link 
org.apache.curator.framework.state.ConnectionState#LOST}
+         * <p/>
+         * This is exposed so that users of the class can be notified of 
issues that *might* affect normal operation.
+         * The TreeCache is written such that listeners are not expected to do 
anything special on this
+         * event, except for those people who want to cause some 
application-specific logic to fire when this occurs.
+         * While the connection is down, the TreeCache will continue to have 
its state from before it lost
+         * the connection and after the connection is restored, the TreeCache 
will emit normal child events
+         * for all of the adds, deletes and updates that happened during the 
time that it was disconnected.
+         */
+        CONNECTION_LOST,
+
+        /**
+         * Posted when the initial cache has been populated.
+         */
+        INITIALIZED
+    }
+
+    /**
+     * @param type event type
+     * @param data event data or null
+     */
+    public TreeCacheEvent(Type type, ChildData data)
+    {
+        this.type = type;
+        this.data = data;
+    }
+
+    /**
+     * @return change type
+     */
+    public Type getType()
+    {
+        return type;
+    }
+
+    /**
+     * @return the node's data
+     */
+    public ChildData getData()
+    {
+        return data;
+    }
+
+    @Override
+    public String toString()
+    {
+        return TreeCacheEvent.class.getSimpleName() + "{" +
+            "type=" + type +
+            ", data=" + data +
+            '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListener.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListener.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListener.java
new file mode 100644
index 0000000..6ea342d
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListener.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+/**
+ * Listener for {@link TreeCache} changes
+ */
+public interface TreeCacheListener
+{
+    /**
+     * Called when a change has occurred
+     *
+     * @param client the client
+     * @param event  describes the change
+     * @throws Exception errors
+     */
+    public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/470d7e73/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
new file mode 100644
index 0000000..2922d40
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.ImmutableSortedSet;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+public class TestTreeCache extends BaseClassForTests
+{
+    private final Timing timing = new Timing();
+    private CuratorFramework client;
+    private TreeCache cache;
+    private List<Throwable> exceptions;
+    private BlockingQueue<TreeCacheEvent> events;
+    private TreeCacheListener eventListener;
+
+    /**
+     * A TreeCache that records exceptions.
+     */
+    private TreeCache newTreeCache(String path, boolean cacheData)
+    {
+        return new TreeCache(client, path, cacheData)
+        {
+            @Override
+            protected void handleException(Throwable e)
+            {
+                exceptions.add(e);
+            }
+        };
+    }
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception
+    {
+        super.setup();
+
+        exceptions = new ArrayList<Throwable>();
+        events = new LinkedBlockingQueue<TreeCacheEvent>();
+        eventListener = new TreeCacheListener()
+        {
+            @Override
+            public void childEvent(CuratorFramework client, TreeCacheEvent 
event) throws Exception
+            {
+                events.add(event);
+            }
+        };
+
+        client = CuratorFrameworkFactory.newClient(server.getConnectString(), 
timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        client.getUnhandledErrorListenable().addListener(new 
UnhandledErrorListener()
+                                                         {
+                                                             @Override
+                                                             public void 
unhandledError(String message, Throwable e)
+                                                             {
+                                                                 
exceptions.add(e);
+                                                             }
+                                                         }
+                                                        );
+        cache = newTreeCache("/test", true);
+        cache.getListenable().addListener(eventListener);
+    }
+
+    @Override
+    @AfterMethod
+    public void teardown() throws Exception
+    {
+        try
+        {
+            try
+            {
+                for ( Throwable exception : exceptions )
+                {
+                    Assert.fail("Exception was thrown", exception);
+                }
+            }
+            finally
+            {
+                CloseableUtils.closeQuietly(cache);
+                CloseableUtils.closeQuietly(client);
+            }
+        }
+        finally
+        {
+            super.teardown();
+        }
+    }
+
+    @Test
+    public void testStartup() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/1", "one".getBytes());
+        client.create().forPath("/test/2", "two".getBytes());
+        client.create().forPath("/test/3", "three".getBytes());
+        client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+        cache.start();
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/1", 
"one".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/2", 
"two".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/3", 
"three".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/2/sub", 
"two-sub".getBytes());
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(cache.getCurrentChildren("/test"), 
ImmutableSortedSet.of("1", "2", "3"));
+        Assert.assertEquals(cache.getCurrentChildren("/test/1"), 
ImmutableSortedSet.of());
+        Assert.assertEquals(cache.getCurrentChildren("/test/2"), 
ImmutableSortedSet.of("sub"));
+        Assert.assertNull(cache.getCurrentChildren("/test/non_exist"));
+    }
+
+    @Test
+    public void testStartEmpty() throws Exception
+    {
+        cache.start();
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+
+        client.create().forPath("/test");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testAsyncInitialPopulation() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+        cache.start();
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testSyncInitialPopulation() throws Exception
+    {
+        cache.start();
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testChildrenInitialized() throws Exception
+    {
+        client.create().forPath("/test", "".getBytes());
+        client.create().forPath("/test/1", "1".getBytes());
+        client.create().forPath("/test/2", "2".getBytes());
+        client.create().forPath("/test/3", "3".getBytes());
+
+        cache.start();
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/1");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/2");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/3");
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception
+    {
+        cache = newTreeCache("/test", false);
+        cache.getListenable().addListener(eventListener);
+
+        client.create().forPath("/test");
+        cache.start();
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+
+        client.create().forPath("/test/foo", "first".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo");
+
+        client.setData().forPath("/test/foo", "something new".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/foo");
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/foo", "one".getBytes());
+        cache.start();
+
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo");
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+
+        client.delete().forPath("/test/foo");
+        assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/foo");
+        client.create().forPath("/test/foo", "two".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo");
+
+        assertNoMoreEvents();
+    }
+
+    // see https://github.com/Netflix/curator/issues/27 - was caused by not 
comparing old->new data
+    @Test
+    public void testIssue27() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/a");
+        client.create().forPath("/test/b");
+        client.create().forPath("/test/c");
+
+        client.getChildren().forPath("/test");
+
+        cache.start();
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/a");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/b");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/c");
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+
+        client.delete().forPath("/test/a");
+        client.create().forPath("/test/a");
+        assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/a");
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/a");
+
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testKilledSession() throws Exception
+    {
+        client.create().forPath("/test");
+        cache.start();
+
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+
+        client.create().forPath("/test/foo", "foo".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo");
+        client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", 
"data".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/me");
+
+        KillSession.kill(client.getZookeeperClient().getZooKeeper(), 
server.getConnectString());
+        assertEvent(TreeCacheEvent.Type.CONNECTION_SUSPENDED);
+        assertEvent(TreeCacheEvent.Type.CONNECTION_LOST);
+        assertEvent(TreeCacheEvent.Type.CONNECTION_RECONNECTED);
+        assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me");
+
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testBasics() throws Exception
+    {
+        client.create().forPath("/test");
+        cache.start();
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+        Assert.assertEquals(cache.getCurrentChildren("/test"), 
ImmutableSortedSet.of());
+
+        client.create().forPath("/test/one", "hey there".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");
+        Assert.assertEquals(cache.getCurrentChildren("/test"), 
ImmutableSortedSet.of("one"));
+        Assert.assertEquals(new 
String(cache.getCurrentData("/test/one").getData()), "hey there");
+
+        client.setData().forPath("/test/one", "sup!".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one");
+        Assert.assertEquals(cache.getCurrentChildren("/test"), 
ImmutableSortedSet.of("one"));
+        Assert.assertEquals(new 
String(cache.getCurrentData("/test/one").getData()), "sup!");
+
+        client.delete().forPath("/test/one");
+        assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/one");
+        Assert.assertEquals(cache.getCurrentChildren("/test"), 
ImmutableSortedSet.of());
+
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testBasicsOnTwoCaches() throws Exception
+    {
+        TreeCache cache2 = newTreeCache("/test", true);
+
+        // Just ensures the same event count; enables test flow control on 
cache2.
+        final Semaphore semaphore = new Semaphore(0);
+        cache2.getListenable().addListener(new TreeCacheListener()
+        {
+            @Override
+            public void childEvent(CuratorFramework client, TreeCacheEvent 
event) throws Exception
+            {
+                semaphore.release();
+            }
+        });
+
+        try
+        {
+            client.create().forPath("/test");
+            cache.start();
+            cache2.start();
+
+            assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+            assertEvent(TreeCacheEvent.Type.INITIALIZED);
+            semaphore.acquire(2);
+
+            client.create().forPath("/test/one", "hey there".getBytes());
+            assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");
+            Assert.assertEquals(new 
String(cache.getCurrentData("/test/one").getData()), "hey there");
+            semaphore.acquire();
+            Assert.assertEquals(new 
String(cache2.getCurrentData("/test/one").getData()), "hey there");
+
+            client.setData().forPath("/test/one", "sup!".getBytes());
+            assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one");
+            Assert.assertEquals(new 
String(cache.getCurrentData("/test/one").getData()), "sup!");
+            semaphore.acquire();
+            Assert.assertEquals(new 
String(cache2.getCurrentData("/test/one").getData()), "sup!");
+
+            client.delete().forPath("/test/one");
+            assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/one");
+            Assert.assertNull(cache.getCurrentData("/test/one"));
+            semaphore.acquire();
+            Assert.assertNull(cache2.getCurrentData("/test/one"));
+
+            assertNoMoreEvents();
+            Assert.assertEquals(semaphore.availablePermits(), 0);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache2);
+        }
+    }
+
+    @Test
+    public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception
+    {
+        client.create().forPath("/test");
+
+        cache.start();
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+        assertEvent(TreeCacheEvent.Type.INITIALIZED);
+
+        client.create().forPath("/test/one", "hey there".getBytes());
+        assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");
+        Assert.assertEquals(new 
String(cache.getCurrentData("/test/one").getData()), "hey there");
+
+        cache.close();
+        assertNoMoreEvents();
+
+        client.delete().forPath("/test/one");
+        assertNoMoreEvents();
+    }
+
+    private void assertNoMoreEvents() throws InterruptedException
+    {
+        timing.sleepABit();
+        Assert.assertTrue(events.isEmpty());
+    }
+
+    private TreeCacheEvent assertEvent(TreeCacheEvent.Type expectedType) 
throws InterruptedException
+    {
+        return assertEvent(expectedType, null);
+    }
+
+    private TreeCacheEvent assertEvent(TreeCacheEvent.Type expectedType, 
String expectedPath) throws InterruptedException
+    {
+        return assertEvent(expectedType, expectedPath, null);
+    }
+
+    private TreeCacheEvent assertEvent(TreeCacheEvent.Type expectedType, 
String expectedPath, byte[] expectedData) throws InterruptedException
+    {
+        TreeCacheEvent event = events.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS);
+        Assert.assertEquals(event.getType(), expectedType);
+        if ( expectedPath == null )
+        {
+            Assert.assertNull(event.getData());
+        }
+        else
+        {
+            Assert.assertEquals(event.getData().getPath(), expectedPath);
+        }
+        if ( expectedData != null )
+        {
+            Assert.assertEquals(event.getData().getData(), expectedData);
+        }
+        return event;
+    }
+}

Reply via email to