[
https://issues.apache.org/jira/browse/CURATOR-33?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14081629#comment-14081629
]
ASF GitHub Bot commented on CURATOR-33:
---------------------------------------
Github user dragonsinth commented on a diff in the pull request:
https://github.com/apache/curator/pull/17#discussion_r15674624
--- Diff:
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
---
@@ -0,0 +1,600 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Maps;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * <p>A utility that attempts to keep all data from all children of a ZK
path locally cached. This class
+ * will watch the ZK path, respond to update/create/delete events, pull
down the data, etc. You can
+ * register a listener that will get notified when changes occur.</p>
+ * <p></p>
+ * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in
sync. Users of this class must
+ * be prepared for false-positives and false-negatives. Additionally,
always use the version number
+ * when updating data to avoid overwriting another process' change.</p>
+ */
+public class TreeCache implements Closeable
+{
+ private static final Logger LOG =
LoggerFactory.getLogger(TreeCache.class);
+
+ private enum NodeState
+ {
+ PENDING, LIVE, DEAD
+ }
+
+ final class TreeNode implements Watcher, BackgroundCallback
+ {
+ private final AtomicReference<NodeState> nodeState = new
AtomicReference<NodeState>(NodeState.PENDING);
+ private final String path;
+ private final TreeNode parent;
+ private final AtomicReference<Stat> stat = new
AtomicReference<Stat>();
+ private final AtomicReference<byte[]> data = new
AtomicReference<byte[]>();
+ private final AtomicReference<ConcurrentMap<String, TreeNode>>
children = new AtomicReference<ConcurrentMap<String, TreeNode>>();
+
+ TreeNode(String path, TreeNode parent)
+ {
+ this.path = path;
+ this.parent = parent;
+ }
+
+ private void refreshChildren() throws Exception
+ {
+ outstandingOps.incrementAndGet();
+
client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
+ }
+
+ private void refreshData() throws Exception
+ {
+ outstandingOps.incrementAndGet();
+ if ( dataIsCompressed )
+ {
+
client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
+ }
+ else
+ {
+
client.getData().usingWatcher(this).inBackground(this).forPath(path);
+ }
+ }
+
+ private void wasReconnected() throws Exception
+ {
+ refreshData();
+ refreshChildren();
+ ConcurrentMap<String, TreeNode> childMap = children.get();
+ if ( childMap != null )
+ {
+ for ( TreeNode child : childMap.values() )
+ {
+ child.wasReconnected();
+ }
+ }
+ }
+
+ private void wasCreated() throws Exception
+ {
+ refreshData();
+ refreshChildren();
+ }
+
+ private void wasDeleted() throws Exception
+ {
+ stat.set(null);
+ data.set(null);
+ client.clearWatcherReferences(this);
+ ConcurrentMap<String, TreeNode> childMap =
children.getAndSet(null);
+ if ( childMap != null )
+ {
+ ArrayList<TreeNode> childCopy = new
ArrayList<TreeNode>(childMap.values());
+ childMap.clear();
+ for ( TreeNode child : childCopy )
+ {
+ child.wasDeleted();
+ }
+ }
+
+ if ( treeState.get() == TreeState.CLOSED )
+ {
+ return;
+ }
+
+ if ( nodeState.compareAndSet(NodeState.LIVE, NodeState.DEAD) )
+ {
+ publishEvent(TreeCacheEvent.Type.NODE_REMOVED, path);
+ }
+
+ if ( parent == null )
+ {
+ // Root node; use an exist query to watch for existence.
+
client.checkExists().usingWatcher(this).inBackground().forPath(path);
+ }
+ else
+ {
+ // Remove from parent if we're currently a child
+ ConcurrentMap<String, TreeNode> parentChildMap =
parent.children.get();
+ if ( parentChildMap != null )
+ {
+ parentChildMap.remove(ZKPaths.getNodeFromPath(path),
this);
+ }
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event)
+ {
+ System.out.println(event);
+ try
+ {
+ switch ( event.getType() )
+ {
+ case NodeCreated:
+ assert parent == null;
+ wasCreated();
+ break;
+ case NodeChildrenChanged:
+ refreshChildren();
+ break;
+ case NodeDataChanged:
+ refreshData();
+ break;
+ case NodeDeleted:
+ wasDeleted();
+ break;
+ }
+ }
+ catch ( Exception e )
+ {
+ handleException(e);
+ }
+ }
+
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent
event) throws Exception
+ {
+ System.out.println(event);
+ switch ( event.getType() )
+ {
+ case EXISTS:
+ // TODO: should only happen for root node
+ if ( event.getResultCode() ==
KeeperException.Code.OK.intValue() )
+ {
+ nodeState.compareAndSet(NodeState.DEAD,
NodeState.PENDING);
+ wasCreated();
+ }
+ else if ( event.getResultCode() ==
KeeperException.Code.NONODE.intValue() )
+ {
+ wasDeleted();
+ }
+ break;
+ case CHILDREN:
+ if ( event.getResultCode() ==
KeeperException.Code.OK.intValue() )
+ {
+ stat.set(event.getStat());
+
+ if ( event.getChildren().isEmpty() )
+ {
+ break;
+ }
+
+ ConcurrentMap<String, TreeNode> childMap =
children.get();
+ if ( childMap == null )
+ {
+ childMap = Maps.newConcurrentMap();
+ if ( !children.compareAndSet(null, childMap) )
+ {
+ childMap = children.get();
+ }
+ }
+
+ for ( String child : event.getChildren() )
+ {
+ String fullPath = ZKPaths.makePath(path, child);
+ if ( !childMap.containsKey(child) )
+ {
+ TreeNode node = new TreeNode(fullPath, this);
+ if ( childMap.putIfAbsent(child, node) == null
)
+ {
+ node.wasCreated();
+ }
+ }
+ }
+ }
+ else if ( event.getResultCode() ==
KeeperException.Code.NONODE.intValue() )
+ {
+ wasDeleted();
+ }
+ break;
+ case GET_DATA:
+ if ( event.getResultCode() ==
KeeperException.Code.OK.intValue() )
+ {
+ Stat oldStat = stat.getAndSet(event.getStat());
+ if ( cacheData )
+ {
+ data.set(event.getData());
+ }
+
+ if ( nodeState.compareAndSet(NodeState.PENDING,
NodeState.LIVE) )
+ {
+ publishEvent(TreeCacheEvent.Type.NODE_ADDED, new
ChildData(event.getPath(), event.getStat(), event.getData()));
+ }
+ else if ( oldStat.getMzxid() !=
event.getStat().getMzxid() )
+ {
+ publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new
ChildData(event.getPath(), event.getStat(), event.getData()));
+ }
+ }
+ else if ( event.getResultCode() ==
KeeperException.Code.NONODE.intValue() )
+ {
+ wasDeleted();
+ }
+ break;
+ default:
+ handleException(new Exception(String.format("Unknown event
%s", event)));
+ }
+
+ if ( outstandingOps.decrementAndGet() == 0 )
+ {
+ if ( treeState.compareAndSet(TreeState.LATENT,
TreeState.STARTED) )
+ {
+ publishEvent(TreeCacheEvent.Type.INITIALIZED);
+ }
+ }
+ }
+ }
+
+ private enum TreeState
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ /**
+ * Detemines when to publish the initialized event.
+ */
+ private final AtomicLong outstandingOps = new AtomicLong(0);
+
+ private final TreeNode root;
+ private final CuratorFramework client;
+ private final CloseableExecutorService executorService;
+ private final boolean cacheData;
+ private final boolean dataIsCompressed;
+ private final ListenerContainer<TreeCacheListener> listeners = new
ListenerContainer<TreeCacheListener>();
+ private final AtomicReference<TreeState> treeState = new
AtomicReference<TreeState>(TreeState.LATENT);
+
+ private final ConnectionStateListener connectionStateListener = new
ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState
newState)
+ {
+ handleStateChange(newState);
+ }
+ };
+
+ private static final ThreadFactory defaultThreadFactory =
ThreadUtils.newThreadFactory("TreeCache");
+
+ /**
+ * @param client the client
+ * @param path path to watch
+ * @param cacheData if true, node contents are cached in addition to
the stat
+ */
+ public TreeCache(CuratorFramework client, String path, boolean
cacheData)
+ {
+ this(client, path, cacheData, false, new
CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory),
true));
+ }
+
+ /**
+ * @param client the client
+ * @param path path to watch
+ * @param cacheData if true, node contents are cached in addition
to the stat
+ * @param threadFactory factory to use when creating internal threads
+ */
+ public TreeCache(CuratorFramework client, String path, boolean
cacheData, ThreadFactory threadFactory)
+ {
+ this(client, path, cacheData, false, new
CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory),
true));
+ }
+
+ /**
+ * @param client the client
+ * @param path path to watch
+ * @param cacheData if true, node contents are cached in
addition to the stat
+ * @param dataIsCompressed if true, data in the path is compressed
+ * @param threadFactory factory to use when creating internal
threads
+ */
+ public TreeCache(CuratorFramework client, String path, boolean
cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
+ {
+ this(client, path, cacheData, dataIsCompressed, new
CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory),
true));
+ }
+
+ /**
+ * @param client the client
+ * @param path path to watch
+ * @param cacheData if true, node contents are cached in
addition to the stat
+ * @param dataIsCompressed if true, data in the path is compressed
+ * @param executorService ExecutorService to use for the TreeCache's
background thread
+ */
+ public TreeCache(CuratorFramework client, String path, boolean
cacheData, boolean dataIsCompressed, final ExecutorService executorService)
+ {
+ this(client, path, cacheData, dataIsCompressed, new
CloseableExecutorService(executorService));
+ }
+
+ /**
+ * @param client the client
+ * @param path path to watch
+ * @param cacheData if true, node contents are cached in
addition to the stat
+ * @param dataIsCompressed if true, data in the path is compressed
+ * @param executorService Closeable ExecutorService to use for the
TreeCache's background thread
+ */
+ public TreeCache(CuratorFramework client, String path, boolean
cacheData, boolean dataIsCompressed, final CloseableExecutorService
executorService)
+ {
+ this.root = new TreeNode(path, null);
+ this.client = client;
+ this.cacheData = cacheData;
+ this.dataIsCompressed = dataIsCompressed;
+ this.executorService = executorService;
+ }
+
+ /**
+ * Start the cache. The cache is not started automatically. You must
call this method.
+ *
+ * @throws Exception errors
+ */
+ public void start() throws Exception
+ {
+
client.getConnectionStateListenable().addListener(connectionStateListener);
+ root.wasCreated();
+ }
+
+ /**
+ * Close/end the cache
+ *
+ * @throws java.io.IOException errors
+ */
+ @Override
+ public void close() throws IOException
+ {
+ if ( treeState.compareAndSet(TreeState.STARTED, TreeState.CLOSED) )
+ {
+
client.getConnectionStateListenable().removeListener(connectionStateListener);
+ listeners.clear();
+ executorService.close();
+ try
+ {
+ root.wasDeleted();
+ }
+ catch ( Exception e )
+ {
+ handleException(e);
+ }
+ }
+ }
+
+ /**
+ * Return the cache listenable
+ *
+ * @return listenable
+ */
+ public ListenerContainer<TreeCacheListener> getListenable()
+ {
+ return listeners;
+ }
+
+ private TreeNode find(String fullPath)
+ {
+ if ( !fullPath.startsWith(root.path) )
+ {
+ return null;
+ }
+
+ TreeNode current = root;
+ if ( fullPath.length() > root.path.length() )
+ {
+ List<String> split =
ZKPaths.split(fullPath.substring(root.path.length()));
+ for ( String part : split )
+ {
+ ConcurrentMap<String, TreeNode> map =
current.children.get();
+ if ( map == null )
+ {
+ return null;
+ }
+ current = map.get(part);
+ if ( current == null )
+ {
+ return null;
+ }
+ }
+ }
+ return current;
+ }
+
+ /**
+ * Return the current set of children. There are no guarantees of
accuracy. This is
+ * merely the most recent view of the data. The data is returned in
sorted order. If there is
--- End diff --
I went ahead and changed this to return `SortedMap<String, ChildData>` to
return both the names and data of all current children. I will add another
accessor to return an entire subtree so I can document that it might be
expensive.
> Recursive Node Cache
> --------------------
>
> Key: CURATOR-33
> URL: https://issues.apache.org/jira/browse/CURATOR-33
> Project: Apache Curator
> Issue Type: Improvement
> Components: Recipes
> Reporter: John Vines
> Assignee: Jordan Zimmerman
> Fix For: TBD
>
> Attachments: CURATOR-33.2.patch, CURATOR-33.patch
>
>
> Currently the PathChildrenCache will trigger listen events for all children
> at the given node. However, it would be useful to have a cache that would
> trigger listen events for the entire hierarchy below the given node.
--
This message was sent by Atlassian JIRA
(v6.2#6252)