[ https://issues.apache.org/jira/browse/CURATOR-33?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14080871#comment-14080871 ]
ASF GitHub Bot commented on CURATOR-33: --------------------------------------- Github user Randgalt commented on a diff in the pull request: https://github.com/apache/curator/pull/17#discussion_r15641822 --- Diff: curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java --- @@ -0,0 +1,605 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Maps; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.CloseableExecutorService; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * <p>A utility that attempts to keep all data from all children of a ZK path locally cached. This class + * will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can + * register a listener that will get notified when changes occur.</p> + * <p></p> + * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must + * be prepared for false-positives and false-negatives. Additionally, always use the version number + * when updating data to avoid overwriting another process' change.</p> + */ +public class TreeCache implements Closeable +{ + private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class); + + private enum NodeState + { + PENDING, LIVE, DEAD + } + + final class TreeNode implements Watcher, BackgroundCallback + { + private final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING); + private final String path; + private final TreeNode parent; + private final AtomicReference<Stat> stat = new AtomicReference<Stat>(); + private final AtomicReference<byte[]> data = new AtomicReference<byte[]>(); + private final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>(); + + TreeNode(String path, TreeNode parent) + { + this.path = path; + this.parent = parent; + } + + private void refreshChildren() throws Exception + { + outstandingOps.incrementAndGet(); + client.getChildren().usingWatcher(this).inBackground(this).forPath(path); + } + + private void refreshData() throws Exception + { + outstandingOps.incrementAndGet(); + if ( dataIsCompressed ) + { + client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path); + } + else + { + client.getData().usingWatcher(this).inBackground(this).forPath(path); + } + } + + private void wasReconnected() throws Exception + { + refreshData(); + refreshChildren(); + ConcurrentMap<String, TreeNode> childMap = children.get(); + if ( childMap != null ) + { + for ( TreeNode child : childMap.values() ) + { + child.wasReconnected(); + } + } + } + + private void wasCreated() throws Exception + { + refreshData(); + refreshChildren(); + } + + private void wasDeleted() throws Exception + { + stat.set(null); + data.set(null); + client.clearWatcherReferences(this); + ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null); + if ( childMap != null ) + { + ArrayList<TreeNode> childCopy = new ArrayList<TreeNode>(childMap.values()); + childMap.clear(); + for ( TreeNode child : childCopy ) + { + child.wasDeleted(); + } + } + + if ( treeState.get() == TreeState.CLOSED ) + { + return; + } + + if ( nodeState.compareAndSet(NodeState.LIVE, NodeState.DEAD) ) + { + publishEvent(TreeCacheEvent.Type.NODE_REMOVED, path); + } + + if ( parent == null ) + { + // Root node; use an exist query to watch for existence. + client.checkExists().usingWatcher(this).inBackground().forPath(path); + } + else + { + // Remove from parent if we're currently a child + ConcurrentMap<String, TreeNode> parentChildMap = parent.children.get(); + if ( parentChildMap != null ) + { + parentChildMap.remove(ZKPaths.getNodeFromPath(path), this); + } + } + } + + @Override + public void process(WatchedEvent event) + { + try + { + switch ( event.getType() ) + { + case NodeCreated: + assert parent == null; + wasCreated(); + break; + case NodeChildrenChanged: + refreshChildren(); + break; + case NodeDataChanged: + refreshData(); + break; + case NodeDeleted: + wasDeleted(); + break; + } + } + catch ( Exception e ) + { + handleException(e); + } + } + + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + switch ( event.getType() ) + { + case EXISTS: + // TODO: should only happen for root node + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + nodeState.compareAndSet(NodeState.DEAD, NodeState.PENDING); + wasCreated(); + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + case CHILDREN: + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + stat.set(event.getStat()); + + if ( event.getChildren().isEmpty() ) + { + break; + } + + ConcurrentMap<String, TreeNode> childMap = children.get(); + if ( childMap == null ) + { + childMap = Maps.newConcurrentMap(); + if ( !children.compareAndSet(null, childMap) ) + { + childMap = children.get(); + } + } + + // Present new children in sorted order for test determinism. + List<String> newChildren = new ArrayList<String>(); + for ( String child : event.getChildren() ) + { + if ( !childMap.containsKey(child) ) + { + newChildren.add(child); + } + } + + Collections.sort(newChildren); + for ( String child : newChildren ) + { + String fullPath = ZKPaths.makePath(path, child); + TreeNode node = new TreeNode(fullPath, this); + if ( childMap.putIfAbsent(child, node) == null ) + { + node.wasCreated(); + } + } + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + case GET_DATA: + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + Stat oldStat = stat.getAndSet(event.getStat()); + if ( cacheData ) + { + data.set(event.getData()); + } + + if ( nodeState.compareAndSet(NodeState.PENDING, NodeState.LIVE) ) + { + publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), event.getStat(), event.getData())); + } + else if ( oldStat.getMzxid() != event.getStat().getMzxid() ) + { + publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(), event.getStat(), event.getData())); + } + } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + wasDeleted(); + } + break; + default: + handleException(new Exception(String.format("Unknown event %s", event))); + } + + if ( outstandingOps.decrementAndGet() == 0 ) + { + if ( treeState.compareAndSet(TreeState.LATENT, TreeState.STARTED) ) + { + publishEvent(TreeCacheEvent.Type.INITIALIZED); + } + } + } + } + + private enum TreeState + { + LATENT, + STARTED, + CLOSED + } + + /** + * Detemines when to publish the initialized event. + */ + private final AtomicLong outstandingOps = new AtomicLong(0); + + private final TreeNode root; + private final CuratorFramework client; + private final CloseableExecutorService executorService; + private final boolean cacheData; + private final boolean dataIsCompressed; + private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>(); + private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT); + + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + handleStateChange(newState); + } + }; + + private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("TreeCache"); + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData) + { + this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param threadFactory factory to use when creating internal threads + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) + { + this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param dataIsCompressed if true, data in the path is compressed + * @param threadFactory factory to use when creating internal threads + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) + { + this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param dataIsCompressed if true, data in the path is compressed + * @param executorService ExecutorService to use for the TreeCache's background thread + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService) + { + this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService)); + } + + /** + * @param client the client + * @param path path to watch + * @param cacheData if true, node contents are cached in addition to the stat + * @param dataIsCompressed if true, data in the path is compressed + * @param executorService Closeable ExecutorService to use for the TreeCache's background thread + */ + public TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) + { + this.root = new TreeNode(path, null); + this.client = client; + this.cacheData = cacheData; + this.dataIsCompressed = dataIsCompressed; + this.executorService = executorService; + } + + /** + * Start the cache. The cache is not started automatically. You must call this method. + * + * @throws Exception errors + */ + public void start() throws Exception + { + client.getConnectionStateListenable().addListener(connectionStateListener); + root.wasCreated(); + } + + /** + * Close/end the cache + * + * @throws java.io.IOException errors + */ + @Override + public void close() throws IOException + { + if ( treeState.compareAndSet(TreeState.STARTED, TreeState.CLOSED) ) + { + client.getConnectionStateListenable().removeListener(connectionStateListener); + listeners.clear(); + executorService.close(); + try + { + root.wasDeleted(); + } + catch ( Exception e ) + { + handleException(e); + } + } + } + + /** + * Return the cache listenable + * + * @return listenable + */ + public ListenerContainer<TreeCacheListener> getListenable() + { + return listeners; + } + + private TreeNode find(String fullPath) + { + if ( !fullPath.startsWith(root.path) ) + { + return null; + } + + TreeNode current = root; + if ( fullPath.length() > root.path.length() ) + { + List<String> split = ZKPaths.split(fullPath.substring(root.path.length())); + for ( String part : split ) + { + ConcurrentMap<String, TreeNode> map = current.children.get(); + if ( map == null ) + { + return null; + } + current = map.get(part); + if ( current == null ) + { + return null; + } + } + } + return current; + } + + /** + * Return the current set of children. There are no guarantees of accuracy. This is + * merely the most recent view of the data. The data is returned in sorted order. If there is + * no child with that path, <code>null</code> is returned. + * + * @param fullPath full path to the node to check + * @return a possibly-empty list of children if the node is alive, or null + */ + public SortedSet<String> getCurrentChildren(String fullPath) + { + TreeNode node = find(fullPath); + if ( node == null || node.nodeState.get() != NodeState.LIVE ) + { + return null; + } + ConcurrentMap<String, TreeNode> map = node.children.get(); + SortedSet<String> result; + if ( map == null ) + { + result = ImmutableSortedSet.of(); + } + else + { + result = ImmutableSortedSet.copyOf(map.keySet()); + } + + // Double-check liveness after retreiving children. + return node.nodeState.get() == NodeState.LIVE ? result : null; + } + + /** + * Return the current data for the given path. There are no guarantees of accuracy. This is + * merely the most recent view of the data. If there is no child with that path, + * <code>null</code> is returned. + * + * @param fullPath full path to the node to check + * @return data if the node is alive, or null + */ + public ChildData getCurrentData(String fullPath) + { + TreeNode node = find(fullPath); + if ( node == null || node.nodeState.get() != NodeState.LIVE ) + { + return null; + } + ChildData result = new ChildData(node.path, node.stat.get(), node.data.get()); + // Double-check liveness after retreiving stat / data. + return node.nodeState.get() == NodeState.LIVE ? result : null; + } + + void callListeners(final TreeCacheEvent event) --- End diff -- can be private > Recursive Node Cache > -------------------- > > Key: CURATOR-33 > URL: https://issues.apache.org/jira/browse/CURATOR-33 > Project: Apache Curator > Issue Type: Improvement > Components: Recipes > Reporter: John Vines > Fix For: awaiting-response > > Attachments: CURATOR-33.2.patch, CURATOR-33.patch > > > Currently the PathChildrenCache will trigger listen events for all children > at the given node. However, it would be useful to have a cache that would > trigger listen events for the entire hierarchy below the given node. -- This message was sent by Atlassian JIRA (v6.2#6252)