Repository: curator Updated Branches: refs/heads/CURATOR-447 [created] 970475f03
remove nodeState field, improve thread safety of cache updates also simplify by having TreeNode extend AtomicReference<ChildData> Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1c7be856 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1c7be856 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1c7be856 Branch: refs/heads/CURATOR-447 Commit: 1c7be856fb514015286855f1ad9fd89069c32906 Parents: a9ab77a Author: nickhill <nickh...@us.ibm.com> Authored: Sun Mar 26 13:19:40 2017 -0700 Committer: nickhill <nickh...@us.ibm.com> Committed: Tue Jan 2 20:31:45 2018 +0000 ---------------------------------------------------------------------- .../framework/recipes/cache/TreeCache.java | 119 ++++++++----------- 1 file changed, 49 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/1c7be856/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index 9bf2789..c908ccd 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -196,25 +196,18 @@ public class TreeCache implements Closeable return new Builder(client, path); } - private enum NodeState - { - PENDING, LIVE, DEAD + private static final ChildData DEAD = new ChildData("/",null,null); + + private static boolean isLive(ChildData cd) { + return cd != null && cd != DEAD; } - private static final AtomicReferenceFieldUpdater<TreeNode, NodeState> nodeStateUpdater = - AtomicReferenceFieldUpdater.newUpdater(TreeNode.class, NodeState.class, "nodeState"); - - private static final AtomicReferenceFieldUpdater<TreeNode, ChildData> childDataUpdater = - AtomicReferenceFieldUpdater.newUpdater(TreeNode.class, ChildData.class, "childData"); - - private static final AtomicReferenceFieldUpdater<TreeNode, ConcurrentMap> childrenUpdater = - AtomicReferenceFieldUpdater.newUpdater(TreeNode.class, ConcurrentMap.class, "children"); + private static final AtomicReferenceFieldUpdater<TreeNode, ConcurrentMap<String,TreeNode>> childrenUpdater = + (AtomicReferenceFieldUpdater)AtomicReferenceFieldUpdater.newUpdater(TreeNode.class, ConcurrentMap.class, "children"); - private final class TreeNode implements Watcher, BackgroundCallback + private final class TreeNode extends AtomicReference<ChildData> implements Watcher, BackgroundCallback { - volatile NodeState nodeState = NodeState.PENDING; - volatile ChildData childData; final TreeNode parent; final String path; volatile ConcurrentMap<String, TreeNode> children; @@ -297,7 +290,11 @@ public class TreeCache implements Closeable void wasDeleted() throws Exception { - ChildData oldChildData = childDataUpdater.getAndSet(this, null); + ChildData oldChildData; + do { + if( (oldChildData = get()) == DEAD ) return; + } while( !compareAndSet(oldChildData, DEAD) ); + ConcurrentMap<String, TreeNode> childMap = childrenUpdater.getAndSet(this,null); if ( childMap != null ) { @@ -314,8 +311,7 @@ public class TreeCache implements Closeable return; } - NodeState oldState = nodeStateUpdater.getAndSet(this, NodeState.DEAD); - if ( oldState == NodeState.LIVE ) + if ( isLive(oldChildData) ) { publishEvent(TreeCacheEvent.Type.NODE_REMOVED, oldChildData); } @@ -377,19 +373,20 @@ public class TreeCache implements Closeable Preconditions.checkState(parent == null, "unexpected EXISTS on non-root node"); if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { - nodeStateUpdater.compareAndSet(this, NodeState.DEAD, NodeState.PENDING); + compareAndSet(DEAD, null); wasCreated(); } break; case CHILDREN: if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { - ChildData oldChildData = childData; - if ( oldChildData != null && oldChildData.getStat().getMzxid() == newStat.getMzxid() ) + ChildData oldChildData = get(); + //TODO consider doing update of cversion, pzxid, numChildren only + if ( isLive(oldChildData) && oldChildData.getStat().getMzxid() == newStat.getMzxid() ) { // Only update stat if mzxid is same, otherwise we might obscure // GET_DATA event updates. - childDataUpdater.compareAndSet(this, oldChildData, new ChildData(oldChildData.getPath(), newStat, oldChildData.getData())); + compareAndSet(oldChildData, new ChildData(oldChildData.getPath(), newStat, oldChildData.getData())); } if ( event.getChildren().isEmpty() ) @@ -397,14 +394,11 @@ public class TreeCache implements Closeable break; } - ConcurrentMap<String, TreeNode> childMap = children; - if ( childMap == null ) + ConcurrentMap<String, TreeNode> childMap; + while ( (childMap = children) == null ) { childMap = Maps.newConcurrentMap(); - if ( !childrenUpdater.compareAndSet(this, null, childMap) ) - { - childMap = children; - } + if ( childrenUpdater.compareAndSet(this, null, childMap) ) break; } // Present new children in sorted order for test determinism. @@ -436,41 +430,28 @@ public class TreeCache implements Closeable case GET_DATA: if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { - ChildData toPublish = new ChildData(event.getPath(), newStat, event.getData()); - ChildData oldChildData; - if ( cacheData ) - { - oldChildData = childDataUpdater.getAndSet(this, toPublish); - } - else - { - oldChildData = childDataUpdater.getAndSet(this, new ChildData(event.getPath(), newStat, null)); - } - - boolean added; - if (parent == null) { - // We're the singleton root. - added = nodeStateUpdater.getAndSet(this, NodeState.LIVE) != NodeState.LIVE; - } else { - added = nodeStateUpdater.compareAndSet(this, NodeState.PENDING, NodeState.LIVE); - if (!added) { - // Ordinary nodes are not allowed to transition from dead -> live; - // make sure this isn't a delayed response that came in after death. - if (nodeState != NodeState.LIVE) { - return; - } + String eventPath = event.getPath(); + ChildData toPublish = new ChildData(eventPath, newStat, event.getData()); + ChildData toUpdate = cacheData ? toPublish : new ChildData(eventPath, newStat, null); + for(;;) { + final ChildData oldChildData = get(); + if ( (isLive(oldChildData) && newStat.getMzxid() <= oldChildData.getStat().getMzxid()) + // Ordinary nodes are not allowed to transition from dead -> live; + // make sure this isn't a delayed response that came in after death. + || (parent != null && oldChildData == DEAD) ) { + break; } - } - - if ( added ) - { - publishEvent(TreeCacheEvent.Type.NODE_ADDED, toPublish); - } - else - { - if ( oldChildData == null || oldChildData.getStat().getMzxid() != newStat.getMzxid() ) + if ( compareAndSet(oldChildData, toUpdate) ) { - publishEvent(TreeCacheEvent.Type.NODE_UPDATED, toPublish); + if ( !isLive(oldChildData) ) + { + publishEvent(TreeCacheEvent.Type.NODE_ADDED, toPublish); + } + else if ( oldChildData.getStat().getMzxid() != newStat.getMzxid() ) + { + publishEvent(TreeCacheEvent.Type.NODE_UPDATED, toPublish); + } + break; } } } @@ -631,7 +612,7 @@ public class TreeCache implements Closeable } /** - * Allows catching unhandled errors in asynchornous operations. + * Allows catching unhandled errors in asynchronous operations. * * TODO: consider making public. */ @@ -687,7 +668,7 @@ public class TreeCache implements Closeable public Map<String, ChildData> getCurrentChildren(String fullPath) { TreeNode node = find(fullPath); - if ( node == null || node.nodeState != NodeState.LIVE ) + if ( node == null || !isLive(node.get()) ) { return null; } @@ -703,9 +684,8 @@ public class TreeCache implements Closeable for ( Map.Entry<String, TreeNode> entry : map.entrySet() ) { TreeNode childNode = entry.getValue(); - ChildData childData = childNode.childData; - // Double-check liveness after retreiving data. - if ( childData != null && childNode.nodeState == NodeState.LIVE ) + ChildData childData = childNode.get(); + if ( isLive(childData) ) { builder.put(entry.getKey(), childData); } @@ -714,7 +694,7 @@ public class TreeCache implements Closeable } // Double-check liveness after retreiving children. - return node.nodeState == NodeState.LIVE ? result : null; + return isLive(node.get()) ? result : null; } /** @@ -728,13 +708,12 @@ public class TreeCache implements Closeable public ChildData getCurrentData(String fullPath) { TreeNode node = find(fullPath); - if ( node == null || node.nodeState != NodeState.LIVE ) + if ( node == null ) { return null; } - ChildData result = node.childData; - // Double-check liveness after retreiving data. - return node.nodeState == NodeState.LIVE ? result : null; + ChildData result = node.get(); + return result != DEAD ? result : null; } private void callListeners(final TreeCacheEvent event)