[
https://issues.apache.org/jira/browse/CURATOR-33?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14081614#comment-14081614
]
ASF GitHub Bot commented on CURATOR-33:
---------------------------------------
Github user dragonsinth commented on a diff in the pull request:
https://github.com/apache/curator/pull/17#discussion_r15674181
--- Diff:
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
---
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Maps;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * <p>A utility that attempts to keep all data from all children of a ZK
path locally cached. This class
+ * will watch the ZK path, respond to update/create/delete events, pull
down the data, etc. You can
+ * register a listener that will get notified when changes occur.</p>
+ * <p></p>
+ * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in
sync. Users of this class must
+ * be prepared for false-positives and false-negatives. Additionally,
always use the version number
+ * when updating data to avoid overwriting another process' change.</p>
+ */
+public class TreeCache implements Closeable
+{
+ private static final Logger LOG =
LoggerFactory.getLogger(TreeCache.class);
+
+ private enum NodeState
+ {
+ PENDING, LIVE, DEAD
+ }
+
+ final class TreeNode implements Watcher, BackgroundCallback
+ {
+ private final AtomicReference<NodeState> nodeState = new
AtomicReference<NodeState>(NodeState.PENDING);
+ private final String path;
+ private final TreeNode parent;
+ private final AtomicReference<Stat> stat = new
AtomicReference<Stat>();
+ private final AtomicReference<byte[]> data = new
AtomicReference<byte[]>();
+ private final AtomicReference<ConcurrentMap<String, TreeNode>>
children = new AtomicReference<ConcurrentMap<String, TreeNode>>();
+
+ TreeNode(String path, TreeNode parent)
+ {
+ this.path = path;
+ this.parent = parent;
+ }
+
+ private void refreshChildren() throws Exception
+ {
+ outstandingOps.incrementAndGet();
+
client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
+ }
+
+ private void refreshData() throws Exception
+ {
+ outstandingOps.incrementAndGet();
+ if ( dataIsCompressed )
+ {
+
client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
+ }
+ else
+ {
+
client.getData().usingWatcher(this).inBackground(this).forPath(path);
+ }
+ }
+
+ private void wasReconnected() throws Exception
+ {
+ refreshData();
+ refreshChildren();
+ ConcurrentMap<String, TreeNode> childMap = children.get();
+ if ( childMap != null )
+ {
+ for ( TreeNode child : childMap.values() )
+ {
+ child.wasReconnected();
+ }
+ }
+ }
+
+ private void wasCreated() throws Exception
+ {
+ refreshData();
+ refreshChildren();
+ }
+
+ private void wasDeleted() throws Exception
+ {
+ stat.set(null);
+ data.set(null);
+ client.clearWatcherReferences(this);
+ ConcurrentMap<String, TreeNode> childMap =
children.getAndSet(null);
+ if ( childMap != null )
+ {
+ ArrayList<TreeNode> childCopy = new
ArrayList<TreeNode>(childMap.values());
+ childMap.clear();
+ for ( TreeNode child : childCopy )
+ {
+ child.wasDeleted();
+ }
+ }
+
+ if ( treeState.get() == TreeState.CLOSED )
+ {
+ return;
+ }
+
+ if ( nodeState.compareAndSet(NodeState.LIVE, NodeState.DEAD) )
+ {
+ publishEvent(TreeCacheEvent.Type.NODE_REMOVED, path);
+ }
+
+ if ( parent == null )
+ {
+ // Root node; use an exist query to watch for existence.
+
client.checkExists().usingWatcher(this).inBackground().forPath(path);
+ }
+ else
+ {
+ // Remove from parent if we're currently a child
+ ConcurrentMap<String, TreeNode> parentChildMap =
parent.children.get();
+ if ( parentChildMap != null )
+ {
+ parentChildMap.remove(ZKPaths.getNodeFromPath(path),
this);
+ }
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event)
+ {
+ try
+ {
+ switch ( event.getType() )
+ {
+ case NodeCreated:
+ assert parent == null;
+ wasCreated();
+ break;
+ case NodeChildrenChanged:
+ refreshChildren();
+ break;
+ case NodeDataChanged:
+ refreshData();
+ break;
+ case NodeDeleted:
+ wasDeleted();
+ break;
+ }
+ }
+ catch ( Exception e )
+ {
+ handleException(e);
+ }
+ }
+
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent
event) throws Exception
+ {
+ switch ( event.getType() )
+ {
+ case EXISTS:
+ // TODO: should only happen for root node
+ if ( event.getResultCode() ==
KeeperException.Code.OK.intValue() )
+ {
+ nodeState.compareAndSet(NodeState.DEAD,
NodeState.PENDING);
+ wasCreated();
+ }
+ else if ( event.getResultCode() ==
KeeperException.Code.NONODE.intValue() )
+ {
+ wasDeleted();
+ }
+ break;
+ case CHILDREN:
+ if ( event.getResultCode() ==
KeeperException.Code.OK.intValue() )
+ {
+ stat.set(event.getStat());
+
+ if ( event.getChildren().isEmpty() )
+ {
+ break;
+ }
+
+ ConcurrentMap<String, TreeNode> childMap =
children.get();
+ if ( childMap == null )
+ {
+ childMap = Maps.newConcurrentMap();
+ if ( !children.compareAndSet(null, childMap) )
+ {
+ childMap = children.get();
+ }
+ }
+
+ // Present new children in sorted order for test
determinism.
+ List<String> newChildren = new ArrayList<String>();
+ for ( String child : event.getChildren() )
+ {
+ if ( !childMap.containsKey(child) )
+ {
+ newChildren.add(child);
+ }
+ }
+
+ Collections.sort(newChildren);
+ for ( String child : newChildren )
+ {
+ String fullPath = ZKPaths.makePath(path, child);
+ TreeNode node = new TreeNode(fullPath, this);
+ if ( childMap.putIfAbsent(child, node) == null )
+ {
+ node.wasCreated();
+ }
+ }
+ }
+ else if ( event.getResultCode() ==
KeeperException.Code.NONODE.intValue() )
+ {
+ wasDeleted();
+ }
+ break;
+ case GET_DATA:
+ if ( event.getResultCode() ==
KeeperException.Code.OK.intValue() )
+ {
+ Stat oldStat = stat.getAndSet(event.getStat());
+ if ( cacheData )
+ {
+ data.set(event.getData());
+ }
+
+ if ( nodeState.compareAndSet(NodeState.PENDING,
NodeState.LIVE) )
+ {
+ publishEvent(TreeCacheEvent.Type.NODE_ADDED, new
ChildData(event.getPath(), event.getStat(), event.getData()));
+ }
+ else if ( oldStat.getMzxid() !=
event.getStat().getMzxid() )
+ {
+ publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new
ChildData(event.getPath(), event.getStat(), event.getData()));
+ }
+ }
+ else if ( event.getResultCode() ==
KeeperException.Code.NONODE.intValue() )
+ {
+ wasDeleted();
+ }
+ break;
+ default:
+ handleException(new Exception(String.format("Unknown event
%s", event)));
+ }
+
+ if ( outstandingOps.decrementAndGet() == 0 )
+ {
+ if ( treeState.compareAndSet(TreeState.LATENT,
TreeState.STARTED) )
+ {
+ publishEvent(TreeCacheEvent.Type.INITIALIZED);
+ }
+ }
+ }
+ }
+
+ private enum TreeState
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ /**
+ * Detemines when to publish the initialized event.
+ */
+ private final AtomicLong outstandingOps = new AtomicLong(0);
--- End diff --
Good point. We do continue tracking this, however, even after we're
initialized. Maybe more clear javadoc? Like:
"Tracks the number of outstanding background requests in flight. The first
time this count reaches 0, we publish the initialized event."
> Recursive Node Cache
> --------------------
>
> Key: CURATOR-33
> URL: https://issues.apache.org/jira/browse/CURATOR-33
> Project: Apache Curator
> Issue Type: Improvement
> Components: Recipes
> Reporter: John Vines
> Assignee: Jordan Zimmerman
> Fix For: TBD
>
> Attachments: CURATOR-33.2.patch, CURATOR-33.patch
>
>
> Currently the PathChildrenCache will trigger listen events for all children
> at the given node. However, it would be useful to have a cache that would
> trigger listen events for the entire hierarchy below the given node.
--
This message was sent by Atlassian JIRA
(v6.2#6252)