[ https://issues.apache.org/jira/browse/CURATOR-33?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14081604#comment-14081604 ]
ASF GitHub Bot commented on CURATOR-33: --------------------------------------- Github user dragonsinth commented on a diff in the pull request: https://github.com/apache/curator/pull/17#discussion_r15674042 --- Diff: curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java --- @@ -0,0 +1,605 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Maps; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.CloseableExecutorService; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * <p>A utility that attempts to keep all data from all children of a ZK path locally cached. This class + * will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can + * register a listener that will get notified when changes occur.</p> + * <p></p> + * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must + * be prepared for false-positives and false-negatives. Additionally, always use the version number + * when updating data to avoid overwriting another process' change.</p> + */ +public class TreeCache implements Closeable +{ + private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class); + + private enum NodeState + { + PENDING, LIVE, DEAD + } + + final class TreeNode implements Watcher, BackgroundCallback + { + private final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING); + private final String path; + private final TreeNode parent; + private final AtomicReference<Stat> stat = new AtomicReference<Stat>(); + private final AtomicReference<byte[]> data = new AtomicReference<byte[]>(); + private final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>(); + + TreeNode(String path, TreeNode parent) + { + this.path = path; + this.parent = parent; + } + + private void refreshChildren() throws Exception + { + outstandingOps.incrementAndGet(); + client.getChildren().usingWatcher(this).inBackground(this).forPath(path); + } + + private void refreshData() throws Exception + { + outstandingOps.incrementAndGet(); + if ( dataIsCompressed ) + { + client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path); + } + else + { + client.getData().usingWatcher(this).inBackground(this).forPath(path); + } + } + + private void wasReconnected() throws Exception + { + refreshData(); + refreshChildren(); + ConcurrentMap<String, TreeNode> childMap = children.get(); + if ( childMap != null ) + { + for ( TreeNode child : childMap.values() ) + { + child.wasReconnected(); + } + } + } + + private void wasCreated() throws Exception + { + refreshData(); + refreshChildren(); + } + + private void wasDeleted() throws Exception + { + stat.set(null); + data.set(null); + client.clearWatcherReferences(this); + ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null); + if ( childMap != null ) + { + ArrayList<TreeNode> childCopy = new ArrayList<TreeNode>(childMap.values()); + childMap.clear(); + for ( TreeNode child : childCopy ) + { + child.wasDeleted(); + } + } + + if ( treeState.get() == TreeState.CLOSED ) + { + return; + } + + if ( nodeState.compareAndSet(NodeState.LIVE, NodeState.DEAD) ) + { + publishEvent(TreeCacheEvent.Type.NODE_REMOVED, path); + } + + if ( parent == null ) + { + // Root node; use an exist query to watch for existence. + client.checkExists().usingWatcher(this).inBackground().forPath(path); + } + else + { + // Remove from parent if we're currently a child + ConcurrentMap<String, TreeNode> parentChildMap = parent.children.get(); + if ( parentChildMap != null ) + { + parentChildMap.remove(ZKPaths.getNodeFromPath(path), this); + } + } + } + + @Override + public void process(WatchedEvent event) + { + try + { + switch ( event.getType() ) + { + case NodeCreated: + assert parent == null; + wasCreated(); + break; + case NodeChildrenChanged: + refreshChildren(); + break; + case NodeDataChanged: + refreshData(); + break; + case NodeDeleted: + wasDeleted(); + break; + } + } + catch ( Exception e ) + { + handleException(e); + } + } + + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + switch ( event.getType() ) + { + case EXISTS: + // TODO: should only happen for root node + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + nodeState.compareAndSet(NodeState.DEAD, NodeState.PENDING); + wasCreated(); + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + case CHILDREN: + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + stat.set(event.getStat()); + + if ( event.getChildren().isEmpty() ) + { + break; + } + + ConcurrentMap<String, TreeNode> childMap = children.get(); + if ( childMap == null ) + { + childMap = Maps.newConcurrentMap(); + if ( !children.compareAndSet(null, childMap) ) + { + childMap = children.get(); + } + } + + // Present new children in sorted order for test determinism. + List<String> newChildren = new ArrayList<String>(); + for ( String child : event.getChildren() ) + { + if ( !childMap.containsKey(child) ) + { + newChildren.add(child); + } + } + + Collections.sort(newChildren); + for ( String child : newChildren ) + { + String fullPath = ZKPaths.makePath(path, child); + TreeNode node = new TreeNode(fullPath, this); + if ( childMap.putIfAbsent(child, node) == null ) + { + node.wasCreated(); + } + } + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + case GET_DATA: + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + Stat oldStat = stat.getAndSet(event.getStat()); + if ( cacheData ) + { + data.set(event.getData()); + } + + if ( nodeState.compareAndSet(NodeState.PENDING, NodeState.LIVE) ) + { + publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), event.getStat(), event.getData())); + } + else if ( oldStat.getMzxid() != event.getStat().getMzxid() ) + { + publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(), event.getStat(), event.getData())); + } + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + default: + handleException(new Exception(String.format("Unknown event %s", event))); + } + + if ( outstandingOps.decrementAndGet() == 0 ) + { + if ( treeState.compareAndSet(TreeState.LATENT, TreeState.STARTED) ) --- End diff -- To be clear, you want to enforce that start() cannot be called multiple times? > Recursive Node Cache > -------------------- > > Key: CURATOR-33 > URL: https://issues.apache.org/jira/browse/CURATOR-33 > Project: Apache Curator > Issue Type: Improvement > Components: Recipes > Reporter: John Vines > Assignee: Jordan Zimmerman > Fix For: TBD > > Attachments: CURATOR-33.2.patch, CURATOR-33.patch > > > Currently the PathChildrenCache will trigger listen events for all children > at the given node. However, it would be useful to have a cache that would > trigger listen events for the entire hierarchy below the given node. -- This message was sent by Atlassian JIRA (v6.2#6252)