Repository: curator Updated Branches: refs/heads/CURATOR-33 03bc3bee0 -> 2aea22a83
http://git-wip-us.apache.org/repos/asf/curator/blob/2aea22a8/CURATOR-33.patch ---------------------------------------------------------------------- diff --git a/CURATOR-33.patch b/CURATOR-33.patch deleted file mode 100644 index dbcb9d4..0000000 --- a/CURATOR-33.patch +++ /dev/null @@ -1,2664 +0,0 @@ -diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java -index d66f7f3..03b169e 100644 ---- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java -+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java -@@ -1,32 +1,28 @@ - /** -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, -- * software distributed under the License is distributed on an -- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -- * KIND, either express or implied. See the License for the -- * specific language governing permissions and limitations -- * under the License. -+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license -+ * agreements. See the NOTICE file distributed with this work for additional information regarding -+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the -+ * "License"); you may not use this file except in compliance with the License. You may obtain a -+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable -+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" -+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License -+ * for the specific language governing permissions and limitations under the License. - */ - - package org.apache.curator.framework.recipes.cache; - --import com.google.common.annotations.VisibleForTesting; --import com.google.common.base.Function; --import com.google.common.base.Preconditions; --import com.google.common.base.Predicate; --import com.google.common.collect.ImmutableList; --import com.google.common.collect.Lists; --import com.google.common.collect.Maps; --import com.google.common.collect.Sets; -+import java.io.Closeable; -+import java.io.IOException; -+import java.util.List; -+import java.util.Map; -+import java.util.Set; -+import java.util.concurrent.ConcurrentMap; -+import java.util.concurrent.Exchanger; -+import java.util.concurrent.ExecutorService; -+import java.util.concurrent.Executors; -+import java.util.concurrent.ThreadFactory; -+import java.util.concurrent.atomic.AtomicReference; -+ - import org.apache.curator.framework.CuratorFramework; - import org.apache.curator.framework.api.BackgroundCallback; - import org.apache.curator.framework.api.CuratorEvent; -@@ -43,79 +39,71 @@ import org.apache.zookeeper.Watcher; - import org.apache.zookeeper.data.Stat; - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; --import java.io.Closeable; --import java.io.IOException; --import java.util.List; --import java.util.Map; --import java.util.Set; --import java.util.concurrent.ConcurrentMap; --import java.util.concurrent.Exchanger; --import java.util.concurrent.ExecutorService; --import java.util.concurrent.Executors; --import java.util.concurrent.ThreadFactory; --import java.util.concurrent.atomic.AtomicReference; -+ -+import com.google.common.annotations.VisibleForTesting; -+import com.google.common.base.Function; -+import com.google.common.base.Preconditions; -+import com.google.common.base.Predicate; -+import com.google.common.collect.ImmutableList; -+import com.google.common.collect.Lists; -+import com.google.common.collect.Maps; -+import com.google.common.collect.Sets; - - /** -- * <p>A utility that attempts to keep all data from all children of a ZK path locally cached. This class -- * will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can -- * register a listener that will get notified when changes occur.</p> -+ * <p> -+ * A utility that attempts to keep all data from all children of a ZK path locally cached. This -+ * class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. -+ * You can register a listener that will get notified when changes occur. -+ * </p> - * <p/> -- * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must -- * be prepared for false-positives and false-negatives. Additionally, always use the version number -- * when updating data to avoid overwriting another process' change.</p> -+ * <p> -+ * <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must be -+ * prepared for false-positives and false-negatives. Additionally, always use the version number -+ * when updating data to avoid overwriting another process' change. -+ * </p> - */ - @SuppressWarnings("NullableProblems") --public class PathChildrenCache implements Closeable --{ -+public class PathChildrenCache implements Closeable { -+ - private final Logger log = LoggerFactory.getLogger(getClass()); - private final CuratorFramework client; - private final String path; - private final CloseableExecutorService executorService; - private final boolean cacheData; -+ private final DescendantHandlingMode descendantHandlingMode; - private final boolean dataIsCompressed; - private final EnsurePath ensurePath; - private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>(); - private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap(); - private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>(); -- private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap()); -+ private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean> newConcurrentMap()); - private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); - -- private enum State -- { -- LATENT, -- STARTED, -- CLOSED -+ private enum State { -+ LATENT, STARTED, CLOSED - } - - private static final ChildData NULL_CHILD_DATA = new ChildData(null, null, null); - -- private final Watcher childrenWatcher = new Watcher() -- { -+ private final Watcher childrenWatcher = new Watcher() { -+ - @Override -- public void process(WatchedEvent event) -- { -+ public void process(WatchedEvent event) { - offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD)); - } - }; - -- private final Watcher dataWatcher = new Watcher() -- { -+ private final Watcher dataWatcher = new Watcher() { -+ - @Override -- public void process(WatchedEvent event) -- { -- try -- { -- if ( event.getType() == Event.EventType.NodeDeleted ) -- { -+ public void process(WatchedEvent event) { -+ try { -+ if (event.getType() == Event.EventType.NodeDeleted) { - remove(event.getPath()); -- } -- else if ( event.getType() == Event.EventType.NodeDataChanged ) -- { -+ } else if (event.getType() == Event.EventType.NodeDataChanged) { - offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath())); - } -- } -- catch ( Exception e ) -- { -+ } catch (Exception e) { - handleException(e); - } - } -@@ -124,98 +112,208 @@ public class PathChildrenCache implements Closeable - @VisibleForTesting - volatile Exchanger<Object> rebuildTestExchanger; - -- private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() -- { -+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { -+ - @Override -- public void stateChanged(CuratorFramework client, ConnectionState newState) -- { -+ public void stateChanged(CuratorFramework client, ConnectionState newState) { - handleStateChange(newState); - } - }; - private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache"); - - /** -- * @param client the client -- * @param path path to watch -- * @param mode caching mode -+ * Method of processing children of the root node. Whether all children under the root node -+ * should be considered or whether only the first level should be considered. -+ */ -+ public enum DescendantHandlingMode { -+ /** -+ * Only children of the root node will be considered by the cache. This is default behaviour -+ */ -+ DIRECT_DESCENDANTS_ONLY, -+ -+ /** -+ * The root nodes children, and its children etc. will all be considered by the cache. -+ */ -+ ALL_DESCENDANTS -+ } -+ -+ /** -+ * @param client -+ * the client -+ * @param path -+ * path to watch -+ * @param mode -+ * caching mode - * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean)} instead - */ -+ @Deprecated - @SuppressWarnings("deprecation") -- public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode) -- { -- this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); -+ public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode) { -+ this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, -+ new CloseableExecutorService( -+ Executors.newSingleThreadExecutor(defaultThreadFactory), true)); - } - - /** -- * @param client the client -- * @param path path to watch -- * @param mode caching mode -- * @param threadFactory factory to use when creating internal threads -- * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)} instead -+ * @param client -+ * the client -+ * @param path -+ * path to watch -+ * @param mode -+ * caching mode -+ * @param threadFactory -+ * factory to use when creating internal threads -+ * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)} -+ * instead - */ -+ @Deprecated - @SuppressWarnings("deprecation") -- public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory) -- { -- this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); -+ public PathChildrenCache(CuratorFramework client, -+ String path, -+ PathChildrenCacheMode mode, -+ ThreadFactory threadFactory) { -+ this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, -+ new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); - } - - /** -- * @param client the client -- * @param path path to watch -- * @param cacheData if true, node contents are cached in addition to the stat -+ * @param client -+ * the client -+ * @param path -+ * path to watch -+ * @param cacheData -+ * if true, node contents are cached in addition to the stat - */ -- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) -- { -- this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); -+ public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) { -+ this(client, path, cacheData, false, new CloseableExecutorService( -+ Executors.newSingleThreadExecutor(defaultThreadFactory), true)); - } - - /** -- * @param client the client -- * @param path path to watch -- * @param cacheData if true, node contents are cached in addition to the stat -- * @param threadFactory factory to use when creating internal threads -+ * @param client -+ * the client -+ * @param path -+ * path to watch -+ * @param cacheData -+ * if true, node contents are cached in addition to the stat -+ * @param descendantHandlingMode -+ * Mode defining if only descendants of the root node will be considered or whether -+ * the entire tree will be. - */ -- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) -- { -- this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); -+ public PathChildrenCache(CuratorFramework client, -+ String path, -+ boolean cacheData, -+ DescendantHandlingMode descendantHandlingMode) { -+ this(client, path, cacheData, descendantHandlingMode, false, new CloseableExecutorService( -+ Executors.newSingleThreadExecutor(defaultThreadFactory), true)); - } - - /** -- * @param client the client -- * @param path path to watch -- * @param cacheData if true, node contents are cached in addition to the stat -- * @param dataIsCompressed if true, data in the path is compressed -- * @param threadFactory factory to use when creating internal threads -+ * @param client -+ * the client -+ * @param path -+ * path to watch -+ * @param cacheData -+ * if true, node contents are cached in addition to the stat -+ * @param threadFactory -+ * factory to use when creating internal threads - */ -- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) -- { -- this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); -+ public PathChildrenCache(CuratorFramework client, -+ String path, -+ boolean cacheData, -+ ThreadFactory threadFactory) { -+ this(client, path, cacheData, false, new CloseableExecutorService( -+ Executors.newSingleThreadExecutor(threadFactory), true)); - } - - /** -- * @param client the client -- * @param path path to watch -- * @param cacheData if true, node contents are cached in addition to the stat -- * @param dataIsCompressed if true, data in the path is compressed -- * @param executorService ExecutorService to use for the PathChildrenCache's background thread -+ * @param client -+ * the client -+ * @param path -+ * path to watch -+ * @param cacheData -+ * if true, node contents are cached in addition to the stat -+ * @param dataIsCompressed -+ * if true, data in the path is compressed -+ * @param threadFactory -+ * factory to use when creating internal threads - */ -- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService) -- { -- this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService)); -+ public PathChildrenCache(CuratorFramework client, -+ String path, -+ boolean cacheData, -+ boolean dataIsCompressed, -+ ThreadFactory threadFactory) { -+ this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService( -+ Executors.newSingleThreadExecutor(threadFactory), true)); - } - - /** -- * @param client the client -- * @param path path to watch -- * @param cacheData if true, node contents are cached in addition to the stat -- * @param dataIsCompressed if true, data in the path is compressed -- * @param executorService Closeable ExecutorService to use for the PathChildrenCache's background thread -+ * @param client -+ * the client -+ * @param path -+ * path to watch -+ * @param cacheData -+ * if true, node contents are cached in addition to the stat -+ * @param dataIsCompressed -+ * if true, data in the path is compressed -+ * @param executorService -+ * ExecutorService to use for the PathChildrenCache's background thread - */ -- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) -- { -+ public PathChildrenCache(CuratorFramework client, -+ String path, -+ boolean cacheData, -+ boolean dataIsCompressed, -+ final ExecutorService executorService) { -+ this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService( -+ executorService)); -+ } -+ -+ /** -+ * @param client -+ * the client -+ * @param path -+ * path to watch -+ * @param cacheData -+ * if true, node contents are cached in addition to the stat -+ * @param dataIsCompressed -+ * if true, data in the path is compressed -+ * @param executorService -+ * Closeable ExecutorService to use for the PathChildrenCache's background thread -+ */ -+ public PathChildrenCache(CuratorFramework client, -+ String path, -+ boolean cacheData, -+ boolean dataIsCompressed, -+ final CloseableExecutorService executorService) { -+ this(client, path, cacheData, DescendantHandlingMode.DIRECT_DESCENDANTS_ONLY, -+ dataIsCompressed, executorService); -+ } -+ -+ /** -+ * @param client -+ * the client -+ * @param path -+ * path to watch -+ * @param cacheData -+ * if true, node contents are cached in addition to the stat -+ * @param descendantHandlingMode -+ * Mode defining if only descendants of the root node will be considered or whether -+ * the entire tree will be. -+ * @param dataIsCompressed -+ * if true, data in the path is compressed -+ * @param executorService -+ * Closeable ExecutorService to use for the PathChildrenCache's background thread -+ */ -+ public PathChildrenCache(CuratorFramework client, -+ String path, -+ boolean cacheData, -+ DescendantHandlingMode descendantHandlingMode, -+ boolean dataIsCompressed, -+ final CloseableExecutorService executorService) { - this.client = client; - this.path = path; - this.cacheData = cacheData; -+ this.descendantHandlingMode = descendantHandlingMode; - this.dataIsCompressed = dataIsCompressed; - this.executorService = executorService; - ensurePath = client.newNamespaceAwareEnsurePath(path); -@@ -223,42 +321,43 @@ public class PathChildrenCache implements Closeable - - /** - * Start the cache. The cache is not started automatically. You must call this method. -- * -- * @throws Exception errors -+ * -+ * @throws Exception -+ * errors - */ -- public void start() throws Exception -- { -+ public void start() throws Exception { - start(StartMode.NORMAL); - } - - /** - * Same as {@link #start()} but gives the option of doing an initial build -- * -- * @param buildInitial if true, {@link #rebuild()} will be called before this method -- * returns in order to get an initial view of the node; otherwise, -- * the cache will be initialized asynchronously -- * @throws Exception errors -+ * -+ * @param buildInitial -+ * if true, {@link #rebuild()} will be called before this method returns in order to -+ * get an initial view of the node; otherwise, the cache will be initialized -+ * asynchronously -+ * @throws Exception -+ * errors - * @deprecated use {@link #start(StartMode)} - */ -- public void start(boolean buildInitial) throws Exception -- { -+ @Deprecated -+ public void start(boolean buildInitial) throws Exception { - start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL); - } - - /** - * Method of priming cache on {@link PathChildrenCache#start(StartMode)} - */ -- public enum StartMode -- { -+ public enum StartMode { - /** -- * cache will _not_ be primed. i.e. it will start empty and you will receive -- * events for all nodes added, etc. -+ * cache will _not_ be primed. i.e. it will start empty and you will receive events for all -+ * nodes added, etc. - */ - NORMAL, - - /** -- * {@link PathChildrenCache#rebuild()} will be called before this method returns in -- * order to get an initial view of the node. -+ * {@link PathChildrenCache#rebuild()} will be called before this method returns in order to -+ * get an initial view of the node. - */ - BUILD_INITIAL_CACHE, - -@@ -271,34 +370,32 @@ public class PathChildrenCache implements Closeable - - /** - * Start the cache. The cache is not started automatically. You must call this method. -- * -- * @param mode Method for priming the cache -- * @throws Exception errors -+ * -+ * @param mode -+ * Method for priming the cache -+ * @throws Exception -+ * errors - */ -- public void start(StartMode mode) throws Exception -- { -- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started"); -+ public void start(StartMode mode) throws Exception { -+ Preconditions.checkState( -+ state.compareAndSet(State.LATENT, State.STARTED), "already started"); - mode = Preconditions.checkNotNull(mode, "mode cannot be null"); - - client.getConnectionStateListenable().addListener(connectionStateListener); - -- switch ( mode ) -- { -- case NORMAL: -- { -+ switch (mode) { -+ case NORMAL: { - offerOperation(new RefreshOperation(this, RefreshMode.STANDARD)); - break; - } - -- case BUILD_INITIAL_CACHE: -- { -+ case BUILD_INITIAL_CACHE: { - rebuild(); - break; - } - -- case POST_INITIALIZED_EVENT: -- { -- initialSet.set(Maps.<String, ChildData>newConcurrentMap()); -+ case POST_INITIALIZED_EVENT: { -+ initialSet.set(Maps.<String, ChildData> newConcurrentMap()); - offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED)); - break; - } -@@ -306,13 +403,13 @@ public class PathChildrenCache implements Closeable - } - - /** -- * NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying -- * for all needed data WITHOUT generating any events to send to listeners. -- * -- * @throws Exception errors -+ * NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying for all -+ * needed data WITHOUT generating any events to send to listeners. -+ * -+ * @throws Exception -+ * errors - */ -- public void rebuild() throws Exception -- { -+ public void rebuild() throws Exception { - Preconditions.checkState(!executorService.isShutdown(), "cache has been closed"); - - ensurePath.ensure(client.getZookeeperClient()); -@@ -320,13 +417,11 @@ public class PathChildrenCache implements Closeable - clear(); - - List<String> children = client.getChildren().forPath(path); -- for ( String child : children ) -- { -+ for (String child : children) { - String fullPath = ZKPaths.makePath(path, child); - internalRebuildNode(fullPath); - -- if ( rebuildTestExchanger != null ) -- { -+ if (rebuildTestExchanger != null) { - rebuildTestExchanger.exchange(new Object()); - } - } -@@ -335,16 +430,34 @@ public class PathChildrenCache implements Closeable - offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT)); - } - -+ private void rebuild(String root) throws Exception { -+ Preconditions.checkState(!executorService.isShutdown(), "cache has been closed"); -+ -+ ensurePath.ensure(client.getZookeeperClient()); -+ -+ List<String> children = client.getChildren().forPath(path); -+ if (children != null && !children.isEmpty()) { -+ for (String child : children) { -+ rebuild(child); -+ } -+ -+ } -+ -+ } -+ - /** - * NOTE: this is a BLOCKING method. Rebuild the internal cache for the given node by querying - * for all needed data WITHOUT generating any events to send to listeners. -- * -- * @param fullPath full path of the node to rebuild -- * @throws Exception errors -+ * -+ * @param fullPath -+ * full path of the node to rebuild -+ * @throws Exception -+ * errors - */ -- public void rebuildNode(String fullPath) throws Exception -- { -- Preconditions.checkArgument(ZKPaths.getPathAndNode(fullPath).getPath().equals(path), "Node is not part of this cache: " + fullPath); -+ public void rebuildNode(String fullPath) throws Exception { -+ Preconditions.checkArgument( -+ ZKPaths.getPathAndNode(fullPath).getPath().equals(path), -+ "Node is not part of this cache: " + fullPath); - Preconditions.checkState(!executorService.isShutdown(), "cache has been closed"); - - ensurePath.ensure(client.getZookeeperClient()); -@@ -357,14 +470,13 @@ public class PathChildrenCache implements Closeable - - /** - * Close/end the cache -- * -- * @throws IOException errors -+ * -+ * @throws IOException -+ * errors - */ - @Override -- public void close() throws IOException -- { -- if ( state.compareAndSet(State.STARTED, State.CLOSED) ) -- { -+ public void close() throws IOException { -+ if (state.compareAndSet(State.STARTED, State.CLOSED)) { - client.getConnectionStateListenable().removeListener(connectionStateListener); - executorService.close(); - } -@@ -372,64 +484,61 @@ public class PathChildrenCache implements Closeable - - /** - * Return the cache listenable -- * -+ * - * @return listenable - */ -- public ListenerContainer<PathChildrenCacheListener> getListenable() -- { -+ public ListenerContainer<PathChildrenCacheListener> getListenable() { - return listeners; - } - - /** -- * Return the current data. There are no guarantees of accuracy. This is -- * merely the most recent view of the data. The data is returned in sorted order. -- * -+ * Return the current data. There are no guarantees of accuracy. This is merely the most recent -+ * view of the data. The data is returned in sorted order. -+ * - * @return list of children and data - */ -- public List<ChildData> getCurrentData() -- { -- return ImmutableList.copyOf(Sets.<ChildData>newTreeSet(currentData.values())); -+ public List<ChildData> getCurrentData() { -+ return ImmutableList.copyOf(Sets.<ChildData> newTreeSet(currentData.values())); - } - - /** - * Return the current data for the given path. There are no guarantees of accuracy. This is -- * merely the most recent view of the data. If there is no child with that path, <code>null</code> -- * is returned. -- * -- * @param fullPath full path to the node to check -+ * merely the most recent view of the data. If there is no child with that path, -+ * <code>null</code> is returned. -+ * -+ * @param fullPath -+ * full path to the node to check - * @return data or null - */ -- public ChildData getCurrentData(String fullPath) -- { -+ public ChildData getCurrentData(String fullPath) { - return currentData.get(fullPath); - } - - /** -- * As a memory optimization, you can clear the cached data bytes for a node. Subsequent -- * calls to {@link ChildData#getData()} for this node will return <code>null</code>. -- * -- * @param fullPath the path of the node to clear -+ * As a memory optimization, you can clear the cached data bytes for a node. Subsequent calls to -+ * {@link ChildData#getData()} for this node will return <code>null</code>. -+ * -+ * @param fullPath -+ * the path of the node to clear - */ -- public void clearDataBytes(String fullPath) -- { -+ public void clearDataBytes(String fullPath) { - clearDataBytes(fullPath, -1); - } - - /** -- * As a memory optimization, you can clear the cached data bytes for a node. Subsequent -- * calls to {@link ChildData#getData()} for this node will return <code>null</code>. -- * -- * @param fullPath the path of the node to clear -- * @param ifVersion if non-negative, only clear the data if the data's version matches this version -+ * As a memory optimization, you can clear the cached data bytes for a node. Subsequent calls to -+ * {@link ChildData#getData()} for this node will return <code>null</code>. -+ * -+ * @param fullPath -+ * the path of the node to clear -+ * @param ifVersion -+ * if non-negative, only clear the data if the data's version matches this version - * @return true if the data was cleared - */ -- public boolean clearDataBytes(String fullPath, int ifVersion) -- { -+ public boolean clearDataBytes(String fullPath, int ifVersion) { - ChildData data = currentData.get(fullPath); -- if ( data != null ) -- { -- if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) ) -- { -+ if (data != null) { -+ if ((ifVersion < 0) || (ifVersion == data.getStat().getVersion())) { - data.clearData(); - return true; - } -@@ -439,275 +548,254 @@ public class PathChildrenCache implements Closeable - - /** - * Clear out current data and begin a new query on the path -- * -- * @throws Exception errors -+ * -+ * @throws Exception -+ * errors - */ -- public void clearAndRefresh() throws Exception -- { -+ public void clearAndRefresh() throws Exception { - currentData.clear(); - offerOperation(new RefreshOperation(this, RefreshMode.STANDARD)); - } - - /** -- * Clears the current data without beginning a new query and without generating any events -- * for listeners. -+ * Clears the current data without beginning a new query and without generating any events for -+ * listeners. - */ -- public void clear() -- { -+ public void clear() { - currentData.clear(); - } - -- enum RefreshMode -- { -- STANDARD, -- FORCE_GET_DATA_AND_STAT, -- POST_INITIALIZED -+ enum RefreshMode { -+ STANDARD, FORCE_GET_DATA_AND_STAT, POST_INITIALIZED - } - -- void refresh(final RefreshMode mode) throws Exception -- { -+ void refresh(final RefreshMode mode, final String nodePath) throws Exception { - ensurePath.ensure(client.getZookeeperClient()); - -- final BackgroundCallback callback = new BackgroundCallback() -- { -+ final BackgroundCallback callback = new BackgroundCallback() { -+ - @Override -- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception -- { -- processChildren(event.getChildren(), mode); -+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { -+ processChildren(event.getPath(), event.getChildren(), mode); - } - }; - -- client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path); -+ client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(nodePath); - } - -- void callListeners(final PathChildrenCacheEvent event) -- { -- listeners.forEach -- ( -- new Function<PathChildrenCacheListener, Void>() -- { -- @Override -- public Void apply(PathChildrenCacheListener listener) -- { -- try -- { -- listener.childEvent(client, event); -- } -- catch ( Exception e ) -- { -- handleException(e); -- } -- return null; -- } -+ void refresh(final RefreshMode mode) throws Exception { -+ refresh(mode, path); -+ } -+ -+ void callListeners(final PathChildrenCacheEvent event) { -+ listeners.forEach(new Function<PathChildrenCacheListener, Void>() { -+ -+ @Override -+ public Void apply(PathChildrenCacheListener listener) { -+ try { -+ listener.childEvent(client, event); -+ } catch (Exception e) { -+ handleException(e); - } -- ); -+ return null; -+ } -+ }); - } - -- void getDataAndStat(final String fullPath) throws Exception -- { -- BackgroundCallback existsCallback = new BackgroundCallback() -- { -+ void getDataAndStat(final String fullPath) throws Exception { -+ BackgroundCallback existsCallback = new BackgroundCallback() { -+ - @Override -- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception -- { -+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { - applyNewData(fullPath, event.getResultCode(), event.getStat(), null); - } - }; - -- BackgroundCallback getDataCallback = new BackgroundCallback() -- { -+ BackgroundCallback getDataCallback = new BackgroundCallback() { -+ - @Override -- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception -- { -+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { - applyNewData(fullPath, event.getResultCode(), event.getStat(), event.getData()); - } - }; - -- if ( cacheData ) -- { -- if ( dataIsCompressed ) -- { -- client.getData().decompressed().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath); -- } -- else -- { -- client.getData().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath); -+ if (cacheData) { -+ if (dataIsCompressed) { -+ client.getData().decompressed().usingWatcher(dataWatcher).inBackground( -+ getDataCallback).forPath(fullPath); -+ } else { -+ client.getData().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath( -+ fullPath); - } -- } -- else -- { -- client.checkExists().usingWatcher(dataWatcher).inBackground(existsCallback).forPath(fullPath); -+ } else { -+ client.checkExists().usingWatcher(dataWatcher).inBackground(existsCallback).forPath( -+ fullPath); - } - } - - /** - * Default behavior is just to log the exception -- * -- * @param e the exception -+ * -+ * @param e -+ * the exception - */ -- protected void handleException(Throwable e) -- { -+ protected void handleException(Throwable e) { - log.error("", e); - } - - @VisibleForTesting -- protected void remove(String fullPath) -- { -+ protected void remove(String fullPath) { - ChildData data = currentData.remove(fullPath); -- if ( data != null ) -- { -- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, data))); -+ if (data != null) { -+ offerOperation(new EventOperation(this, new PathChildrenCacheEvent( -+ PathChildrenCacheEvent.Type.CHILD_REMOVED, data))); - } - - Map<String, ChildData> localInitialSet = initialSet.get(); -- if ( localInitialSet != null ) -- { -+ if (localInitialSet != null) { - localInitialSet.remove(fullPath); - maybeOfferInitializedEvent(localInitialSet); - } - } - -- private void internalRebuildNode(String fullPath) throws Exception -- { -- if ( cacheData ) -- { -- try -- { -+ private void internalRebuildNode(String fullPath) throws Exception { -+ if (cacheData) { -+ try { - Stat stat = new Stat(); -- byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(fullPath) : client.getData().storingStatIn(stat).forPath(fullPath); -+ byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn( -+ stat).forPath(fullPath) : client.getData().storingStatIn(stat).forPath( -+ fullPath); - currentData.put(fullPath, new ChildData(fullPath, stat, bytes)); -- } -- catch ( KeeperException.NoNodeException ignore ) -- { -+ } catch (KeeperException.NoNodeException ignore) { - // node no longer exists - remove it - currentData.remove(fullPath); - } -- } -- else -- { -+ } else { - Stat stat = client.checkExists().forPath(fullPath); -- if ( stat != null ) -- { -+ if (stat != null) { - currentData.put(fullPath, new ChildData(fullPath, stat, null)); -- } -- else -- { -+ } else { - // node no longer exists - remove it - currentData.remove(fullPath); - } - } - } - -- private void handleStateChange(ConnectionState newState) -- { -- switch ( newState ) -- { -- case SUSPENDED: -- { -- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null))); -- break; -- } -- -- case LOST: -- { -- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null))); -- break; -- } -+ private void handleStateChange(ConnectionState newState) { -+ switch (newState) { -+ case SUSPENDED: { -+ offerOperation(new EventOperation(this, new PathChildrenCacheEvent( -+ PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null))); -+ break; -+ } - -- case RECONNECTED: -- { -- try -- { -- offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT)); -- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null))); -+ case LOST: { -+ offerOperation(new EventOperation(this, new PathChildrenCacheEvent( -+ PathChildrenCacheEvent.Type.CONNECTION_LOST, null))); -+ break; - } -- catch ( Exception e ) -- { -- handleException(e); -+ -+ case RECONNECTED: { -+ try { -+ offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT)); -+ offerOperation(new EventOperation(this, new PathChildrenCacheEvent( -+ PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null))); -+ } catch (Exception e) { -+ handleException(e); -+ } -+ break; - } -- break; -- } - } - } - -- private void processChildren(List<String> children, RefreshMode mode) throws Exception -- { -- List<String> fullPaths = Lists.newArrayList(Lists.transform -- ( -- children, -- new Function<String, String>() -- { -+ private void processChildren(final String root, List<String> children, RefreshMode mode) -+ throws Exception { -+ List<String> fullPaths = Lists.newArrayList(Lists.transform( -+ children, new Function<String, String>() { -+ - @Override -- public String apply(String child) -- { -- return ZKPaths.makePath(path, child); -+ public String apply(String child) { -+ return ZKPaths.makePath(root, child); - } -- } -- )); -+ })); - Set<String> removedNodes = Sets.newHashSet(currentData.keySet()); - removedNodes.removeAll(fullPaths); -+ removedNodes.remove(root); - -- for ( String fullPath : removedNodes ) -- { -- remove(fullPath); -+ Set<String> nodesToKeep = Sets.newHashSet(); -+ for (String removedNode : removedNodes) { -+ // Don't remove the current node being processed, or any of its parent nodes -+ if (removedNode.length() <= root.length() && root.startsWith(removedNode)) { -+ nodesToKeep.add(removedNode); -+ } - } - -- for ( String name : children ) -- { -- String fullPath = ZKPaths.makePath(path, name); -+ for (String fullPath : removedNodes) { -+ if (!nodesToKeep.contains(fullPath)) { -+ remove(fullPath); -+ } -+ } - -- if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) ) -- { -+ for (String name : children) { -+ String fullPath = ZKPaths.makePath(root, name); -+ -+ boolean exists = currentData.containsKey(fullPath); -+ -+ if ((mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !exists) { - getDataAndStat(fullPath); - } - - updateInitialSet(name, NULL_CHILD_DATA); -+ -+ if (!exists && descendantHandlingMode == DescendantHandlingMode.ALL_DESCENDANTS) { -+ if (!removedNodes.contains(fullPath)) { -+ refresh(mode, fullPath); -+ } -+ } - } - maybeOfferInitializedEvent(initialSet.get()); - } - -- private void applyNewData(String fullPath, int resultCode, Stat stat, byte[] bytes) -- { -- if ( resultCode == KeeperException.Code.OK.intValue() ) // otherwise - node must have dropped or something - we should be getting another event -+ private void applyNewData(String fullPath, int resultCode, Stat stat, byte[] bytes) { -+ if (resultCode == KeeperException.Code.OK.intValue()) // otherwise - node must have dropped -+ // or something - we should be getting -+ // another event - { - ChildData data = new ChildData(fullPath, stat, bytes); - ChildData previousData = currentData.put(fullPath, data); -- if ( previousData == null ) // i.e. new -- { -- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data))); -- } -- else if ( previousData.getStat().getVersion() != stat.getVersion() ) -+ if (previousData == null) // i.e. new - { -- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data))); -+ offerOperation(new EventOperation(this, new PathChildrenCacheEvent( -+ PathChildrenCacheEvent.Type.CHILD_ADDED, data))); -+ } else if (previousData.getStat().getVersion() != stat.getVersion()) { -+ offerOperation(new EventOperation(this, new PathChildrenCacheEvent( -+ PathChildrenCacheEvent.Type.CHILD_UPDATED, data))); - } - updateInitialSet(ZKPaths.getNodeFromPath(fullPath), data); - } - } - -- private void updateInitialSet(String name, ChildData data) -- { -+ private void updateInitialSet(String name, ChildData data) { - Map<String, ChildData> localInitialSet = initialSet.get(); -- if ( localInitialSet != null ) -- { -+ if (localInitialSet != null) { - localInitialSet.put(name, data); - maybeOfferInitializedEvent(localInitialSet); - } - } - -- private void maybeOfferInitializedEvent(Map<String, ChildData> localInitialSet) -- { -- if ( !hasUninitialized(localInitialSet) ) -- { -+ private void maybeOfferInitializedEvent(Map<String, ChildData> localInitialSet) { -+ if (!hasUninitialized(localInitialSet)) { - // all initial children have been processed - send initialized message - -- if ( initialSet.getAndSet(null) != null ) // avoid edge case - don't send more than 1 INITIALIZED event -+ if (initialSet.getAndSet(null) != null) // avoid edge case - don't send more than 1 -+ // INITIALIZED event - { - final List<ChildData> children = ImmutableList.copyOf(localInitialSet.values()); -- PathChildrenCacheEvent event = new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null) -- { -+ PathChildrenCacheEvent event = new PathChildrenCacheEvent( -+ PathChildrenCacheEvent.Type.INITIALIZED, null) { -+ - @Override -- public List<ChildData> getInitialData() -- { -+ public List<ChildData> getInitialData() { - return children; - } - }; -@@ -716,71 +804,56 @@ public class PathChildrenCache implements Closeable - } - } - -- private boolean hasUninitialized(Map<String, ChildData> localInitialSet) -- { -- if ( localInitialSet == null ) -- { -+ private boolean hasUninitialized(Map<String, ChildData> localInitialSet) { -+ if (localInitialSet == null) { - return false; - } - -- Map<String, ChildData> uninitializedChildren = Maps.filterValues -- ( -- localInitialSet, -- new Predicate<ChildData>() -- { -+ Map<String, ChildData> uninitializedChildren = Maps.filterValues( -+ localInitialSet, new Predicate<ChildData>() { -+ - @Override -- public boolean apply(ChildData input) -- { -- return (input == NULL_CHILD_DATA); // check against ref intentional -+ public boolean apply(ChildData input) { -+ return (input == NULL_CHILD_DATA); // check against ref intentional - } -- } -- ); -+ }); - return (uninitializedChildren.size() != 0); - } - -- private void offerOperation(final Operation operation) -- { -- if ( operationsQuantizer.add(operation) ) -- { -- submitToExecutor -- ( -- new Runnable() -- { -- @Override -- public void run() -- { -- try -- { -- operationsQuantizer.remove(operation); -- operation.invoke(); -- } -- catch ( Exception e ) -- { -- handleException(e); -- } -+ private void offerOperation(final Operation operation) { -+ if (operationsQuantizer.add(operation)) { -+ submitToExecutor(new Runnable() { -+ -+ @Override -+ public void run() { -+ try { -+ operationsQuantizer.remove(operation); -+ operation.invoke(); -+ } catch (Exception e) { -+ handleException(e); - } - } -- ); -+ }); - } - } - - /** - * Submits a runnable to the executor. - * <p/> -- * This method is synchronized because it has to check state about whether this instance is still open. Without this check -- * there is a race condition with the dataWatchers that get set. Even after this object is closed() it can still be -- * called by those watchers, because the close() method cannot actually disable the watcher. -+ * This method is synchronized because it has to check state about whether this instance is -+ * still open. Without this check there is a race condition with the dataWatchers that get set. -+ * Even after this object is closed() it can still be called by those watchers, because the -+ * close() method cannot actually disable the watcher. - * <p/> -- * The synchronization overhead should be minimal if non-existant as this is generally only called from the -- * ZK client thread and will only contend if close() is called in parallel with an update, and that's the exact state -- * we want to protect from. -- * -- * @param command The runnable to run -+ * The synchronization overhead should be minimal if non-existant as this is generally only -+ * called from the ZK client thread and will only contend if close() is called in parallel with -+ * an update, and that's the exact state we want to protect from. -+ * -+ * @param command -+ * The runnable to run - */ -- private synchronized void submitToExecutor(final Runnable command) -- { -- if ( state.get() == State.STARTED ) -- { -+ private synchronized void submitToExecutor(final Runnable command) { -+ if (state.get() == State.STARTED) { - executorService.submit(command); - } - } -diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java -index 4b117fb..e524153 100644 ---- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java -+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java -@@ -1,29 +1,37 @@ - /** -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, -- * software distributed under the License is distributed on an -- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -- * KIND, either express or implied. See the License for the -- * specific language governing permissions and limitations -- * under the License. -+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license -+ * agreements. See the NOTICE file distributed with this work for additional information regarding -+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the -+ * "License"); you may not use this file except in compliance with the License. You may obtain a -+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable -+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" -+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License -+ * for the specific language governing permissions and limitations under the License. - */ - package org.apache.curator.framework.recipes.cache; - --import com.google.common.collect.Lists; --import com.google.common.io.Closeables; -+import java.util.Collection; -+import java.util.List; -+import java.util.concurrent.BlockingQueue; -+import java.util.concurrent.Callable; -+import java.util.concurrent.CountDownLatch; -+import java.util.concurrent.Exchanger; -+import java.util.concurrent.ExecutionException; -+import java.util.concurrent.ExecutorService; -+import java.util.concurrent.Executors; -+import java.util.concurrent.Future; -+import java.util.concurrent.LinkedBlockingQueue; -+import java.util.concurrent.Semaphore; -+import java.util.concurrent.TimeUnit; -+import java.util.concurrent.TimeoutException; -+import java.util.concurrent.atomic.AtomicInteger; -+import java.util.concurrent.atomic.AtomicReference; -+ - import org.apache.curator.framework.CuratorFramework; - import org.apache.curator.framework.CuratorFrameworkFactory; - import org.apache.curator.framework.api.UnhandledErrorListener; - import org.apache.curator.framework.recipes.BaseClassForTests; -+import org.apache.curator.framework.recipes.cache.PathChildrenCache.DescendantHandlingMode; - import org.apache.curator.retry.RetryOneTime; - import org.apache.curator.test.KillSession; - import org.apache.curator.test.Timing; -@@ -31,57 +39,48 @@ import org.apache.zookeeper.CreateMode; - import org.apache.zookeeper.KeeperException; - import org.testng.Assert; - import org.testng.annotations.Test; --import java.util.Collection; --import java.util.List; --import java.util.concurrent.*; --import java.util.concurrent.atomic.AtomicInteger; --import java.util.concurrent.atomic.AtomicReference; - --public class TestPathChildrenCache extends BaseClassForTests --{ -+import com.google.common.collect.Lists; -+import com.google.common.io.Closeables; -+ -+public class TestPathChildrenCache extends BaseClassForTests { -+ - @Test -- public void testPostInitializedForEmpty() throws Exception -- { -+ public void testPostInitializedForEmpty() throws Exception { - Timing timing = new Timing(); - PathChildrenCache cache = null; -- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); -- try -- { -+ CuratorFramework client = CuratorFrameworkFactory.newClient( -+ server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime( -+ 1)); -+ try { - client.start(); - - final CountDownLatch latch = new CountDownLatch(1); - cache = new PathChildrenCache(client, "/test", true); -- cache.getListenable().addListener -- ( -- new PathChildrenCacheListener() -- { -- @Override -- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception -- { -- if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ) -- { -- latch.countDown(); -- } -+ cache.getListenable().addListener(new PathChildrenCacheListener() { -+ -+ @Override -+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) -+ throws Exception { -+ if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED) { -+ latch.countDown(); - } - } -- ); -+ }); - cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); - Assert.assertTrue(timing.awaitLatch(latch)); -- } -- finally -- { -+ } finally { - Closeables.closeQuietly(cache); - Closeables.closeQuietly(client); - } - } - - @Test -- public void testAsyncInitialPopulation() throws Exception -- { -+ public void testAsyncInitialPopulation() throws Exception { - PathChildrenCache cache = null; -- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); -- try -- { -+ CuratorFramework client = CuratorFrameworkFactory.newClient( -+ server.getConnectString(), new RetryOneTime(1)); -+ try { - client.start(); - - client.create().forPath("/test"); -@@ -89,17 +88,14 @@ public class TestPathChildrenCache extends BaseClassForTests - - final BlockingQueue<PathChildrenCacheEvent> events = new LinkedBlockingQueue<PathChildrenCacheEvent>(); - cache = new PathChildrenCache(client, "/test", true); -- cache.getListenable().addListener -- ( -- new PathChildrenCacheListener() -- { -- @Override -- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception -- { -- events.offer(event); -- } -- } -- ); -+ cache.getListenable().addListener(new PathChildrenCacheListener() { -+ -+ @Override -+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) -+ throws Exception { -+ events.offer(event); -+ } -+ }); - cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); - - PathChildrenCacheEvent event = events.poll(10, TimeUnit.SECONDS); -@@ -108,22 +104,20 @@ public class TestPathChildrenCache extends BaseClassForTests - event = events.poll(10, TimeUnit.SECONDS); - Assert.assertEquals(event.getType(), PathChildrenCacheEvent.Type.INITIALIZED); - Assert.assertEquals(event.getInitialData().size(), 1); -- } -- finally -- { -+ } finally { - Closeables.closeQuietly(cache); - Closeables.closeQuietly(client); - } - } - - @Test -- public void testChildrenInitialized() throws Exception -- { -+ public void testChildrenInitialized() throws Exception { - Timing timing = new Timing(); - PathChildrenCache cache = null; -- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); -- try -- { -+ CuratorFramework client = CuratorFrameworkFactory.newClient( -+ server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime( -+ 1)); -+ try { - client.start(); - client.create().forPath("/test"); - -@@ -131,24 +125,18 @@ public class TestPathChildrenCache extends BaseClassForTests - - final CountDownLatch addedLatch = new CountDownLatch(3); - final CountDownLatch initLatch = new CountDownLatch(1); -- cache.getListenable().addListener -- ( -- new PathChildrenCacheListener() -- { -- @Override -- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception -- { -- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) -- { -- addedLatch.countDown(); -- } -- else if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ) -- { -- initLatch.countDown(); -- } -- } -+ cache.getListenable().addListener(new PathChildrenCacheListener() { -+ -+ @Override -+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) -+ throws Exception { -+ if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { -+ addedLatch.countDown(); -+ } else if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED) { -+ initLatch.countDown(); - } -- ); -+ } -+ }); - - client.create().forPath("/test/1", "1".getBytes()); - client.create().forPath("/test/2", "2".getBytes()); -@@ -162,45 +150,37 @@ public class TestPathChildrenCache extends BaseClassForTests - Assert.assertEquals(cache.getCurrentData().get(0).getData(), "1".getBytes()); - Assert.assertEquals(cache.getCurrentData().get(1).getData(), "2".getBytes()); - Assert.assertEquals(cache.getCurrentData().get(2).getData(), "3".getBytes()); -- } -- finally -- { -+ } finally { - Closeables.closeQuietly(cache); - Closeables.closeQuietly(client); - } - } - - @Test -- public void testUpdateWhenNotCachingData() throws Exception -- { -+ public void testUpdateWhenNotCachingData() throws Exception { - Timing timing = new Timing(); - -- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); -+ CuratorFramework client = CuratorFrameworkFactory.newClient( -+ server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime( -+ 1)); - client.start(); -- try -- { -+ try { - final CountDownLatch updatedLatch = new CountDownLatch(1); - final CountDownLatch addedLatch = new CountDownLatch(1); - client.create().creatingParentsIfNeeded().forPath("/test"); - PathChildrenCache cache = new PathChildrenCache(client, "/test", false); -- cache.getListenable().addListener -- ( -- new PathChildrenCacheListener() -- { -- @Override -- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception -- { -- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED ) -- { -- updatedLatch.countDown(); -- } -- else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) -- { -- addedLatch.countDown(); -- } -- } -+ cache.getListenable().addListener(new PathChildrenCacheListener() { -+ -+ @Override -+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) -+ throws Exception { -+ if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) { -+ updatedLatch.countDown(); -+ } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { -+ addedLatch.countDown(); - } -- ); -+ } -+ }); - cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - - client.create().forPath("/test/foo", "first".getBytes()); -@@ -208,94 +188,72 @@ public class TestPathChildrenCache extends BaseClassForTests - - client.setData().forPath("/test/foo", "something new".getBytes()); - Assert.assertTrue(timing.awaitLatch(updatedLatch)); -- } -- finally -- { -+ } finally { - Closeables.closeQuietly(client); - } - } - - @Test -- public void testEnsurePath() throws Exception -- { -+ public void testEnsurePath() throws Exception { - Timing timing = new Timing(); - -- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); -+ CuratorFramework client = CuratorFrameworkFactory.newClient( -+ server.getConnectString(), new RetryOneTime(1)); - client.start(); -- try -- { -+ try { - PathChildrenCache cache = new PathChildrenCache(client, "/one/two/three", false); - cache.start(); - timing.sleepABit(); - -- try -- { -+ try { - client.create().forPath("/one/two/three/four"); -- } -- catch ( KeeperException.NoNodeException e ) -- { -+ } catch (KeeperException.NoNodeException e) { - Assert.fail("Path should exist", e); - } -- } -- finally -- { -+ } finally { - Closeables.closeQuietly(client); - } - } - - @Test -- public void testDeleteThenCreate() throws Exception -- { -- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); -+ public void testDeleteThenCreate() throws Exception { -+ CuratorFramework client = CuratorFrameworkFactory.newClient( -+ server.getConnectString(), new RetryOneTime(1)); - client.start(); -- try -- { -+ try { - client.create().forPath("/test"); - client.create().forPath("/test/foo", "one".getBytes()); - - final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); -- client.getUnhandledErrorListenable().addListener -- ( -- new UnhandledErrorListener() -- { -- @Override -- public void unhandledError(String message, Throwable e) -- { -- error.set(e); -- } -- } -- ); -+ client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { -+ -+ @Override -+ public void unhandledError(String message, Throwable e) { -+ error.set(e); -+ } -+ }); - - final CountDownLatch removedLatch = new CountDownLatch(1); - final CountDownLatch postRemovedLatch = new CountDownLatch(1); - final CountDownLatch dataLatch = new CountDownLatch(1); - PathChildrenCache cache = new PathChildrenCache(client, "/test", true); -- cache.getListenable().addListener -- ( -- new PathChildrenCacheListener() -- { -- @Override -- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception -- { -- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ) -- { -- removedLatch.countDown(); -- Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS)); -- } -- else -- { -- try -- { -- Assert.assertEquals(event.getData().getData(), "two".getBytes()); -- } -- finally -- { -- dataLatch.countDown(); -- } -- } -+ cache.getListenable().addListener(new PathChildrenCacheListener() { -+ -+ @Override -+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) -+ throws Exception { -+ if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) { -+ removedLatch.countDown(); -+ Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS)); -+ } else { -+ try { -+ Assert.assertEquals(event.getData().getData(), "two".getBytes()); -+ } finally { -+ dataLatch.countDown(); - } - } -- ); -+ } -+ }); - cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - - client.delete().forPath("/test/foo"); -@@ -305,26 +263,22 @@ public class TestPathChildrenCache extends BaseClassForTests - Assert.assertTrue(dataLatch.await(10, TimeUnit.SECONDS)); - - Throwable t = error.get(); -- if ( t != null ) -- { -+ if (t != null) { - Assert.fail("Assert", t); - } - - cache.close(); -- } -- finally -- { -+ } finally { - client.close(); - } - } - - @Test -- public void testRebuildAgainstOtherProcesses() throws Exception -- { -- final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); -+ public void testRebuildAgainstOtherProcesses() throws Exception { -+ final CuratorFramework client = CuratorFrameworkFactory.newClient( -+ server.getConnectString(), new RetryOneTime(1)); - client.start(); -- try -- { -+ try { - client.create().forPath("/test"); - client.create().forPath("/test/foo"); - client.create().forPath("/test/bar"); -@@ -332,69 +286,56 @@ public class TestPathChildrenCache extends BaseClassForTests - - final CountDownLatch addedLatch = new CountDownLatch(2); - final PathChildrenCache cache = new PathChildrenCache(client, "/test", true); -- cache.getListenable().addListener -- ( -- new PathChildrenCacheListener() -- { -- @Override -- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception -- { -- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) -- { -- if ( event.getData().getPath().equals("/test/test") ) -- { -- addedLatch.countDown(); -- } -- } -- else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED ) -- { -- if ( event.getData().getPath().equals("/test/snafu") ) -- { -- addedLatch.countDown(); -- } -- } -+ cache.getListenable().addListener(new PathChildrenCacheListener() { -+ -+ @Override -+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) -+ throws Exception { -+ if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { -+ if (event.getData().getPath().equals("/test/test")) { -+ addedLatch.countDown(); -+ } -+ } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) { -+ if (event.getData().getPath().equals("/test/snafu")) { -+ addedLatch.countDown(); - } - } -- ); -+ } -+ }); - cache.rebuildTestExchanger = new Exchanger<Object>(); - ExecutorService service = Executors.newSingleThreadExecutor(); - final AtomicReference<String> deletedPath = new AtomicReference<String>(); -- Future<Object> future = service.submit -- ( -- new Callable<Object>() -- { -- @Override -- public Object call() throws Exception -- { -- cache.rebuildTestExchanger.exchange(new Object()); -- -- // simulate another process adding a node while we're rebuilding -- client.create().forPath("/test/test"); -- -- List<ChildData> currentData = cache.getCurrentData(); -- Assert.assertTrue(currentData.size() > 0); -- -- // simulate another process removing a node while we're rebuilding -- client.delete().forPath(currentData.get(0).getPath()); -- deletedPath.set(currentData.get(0).getPath()); -- -- cache.rebuildTestExchanger.exchange(new Object()); -- -- ChildData childData = null; -- while ( childData == null ) -- { -- childData = cache.getCurrentData("/test/snafu"); -- Thread.sleep(1000); -- } -- Assert.assertEquals(childData.getData(), "original".getBytes()); -- client.setData().forPath("/test/snafu", "grilled".getBytes()); -- -- cache.rebuildTestExchanger.exchange(new Object()); -- -- return null; -- } -+ Future<Object> future = service.submit(new Callable<Object>() { -+ -+ @Override -+ public Object call() throws Exception { -+ cache.rebuildTestExchanger.exchange(new Object()); -+ -+ // simulate another process adding a node while we're rebuilding -+ client.create().forPath("/test/test"); -+ -+ List<ChildData> currentData = cache.getCurrentData(); -+ Assert.assertTrue(currentData.size() > 0); -+ -+ // simulate another process removing a node while we're rebuilding -+ client.delete().forPath(currentData.get(0).getPath()); -+ deletedPath.set(currentData.get(0).getPath()); -+ -+ cache.rebuildTestExchanger.exchange(new Object()); -+ -+ ChildData childData = null; -+ while (childData == null) { -+ childData = cache.getCurrentData("/test/snafu"); -+ Thread.sleep(1000); - } -- ); -+ Assert.assertEquals(childData.getData(), "original".getBytes()); -+ client.setData().forPath("/test/snafu", "grilled".getBytes()); -+ -+ cache.rebuildTestExchanger.exchange(new Object()); -+ -+ return null; -+ } -+ }); - cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - future.get(); - -@@ -404,21 +345,18 @@ public class TestPathChildrenCache extends BaseClassForTests - Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes()); - - cache.close(); -- } -- finally -- { -+ } finally { - client.close(); - } - } - - // see https://github.com/Netflix/curator/issues/27 - was caused by not comparing old->new data - @Test -- public void testIssue27() throws Exception -- { -- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); -+ public void testIssue27() throws Exception { -+ CuratorFramework client = CuratorFrameworkFactory.newClient( -+ server.getConnectString(), new RetryOneTime(1)); - client.start(); -- try -- { -+ try { - client.create().forPath("/base"); - client.create().forPath("/base/a"); - client.create().forPath("/base/b"); -@@ -429,18 +367,15 @@ public class TestPathChildrenCache extends BaseClassForTests - final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList(); - final Semaphore semaphore = new Semaphore(0); - PathChildrenCache cache = new PathChildrenCache(client, "/base", true); -- cache.getListenable().addListener -- ( -- new PathChildrenCacheListener() -- { -- @Override -- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception -- { -- events.add(event.getType()); -- semaphore.release(); -- } -- } -- ); -+ cache.getListenable().addListener(new PathChildrenCacheListener() { -+ -+ @Override -+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) -+ throws Exception { -+ events.add(event.getType()); -+ semaphore.release(); -+ } -+ }); - cache.start(); - - Assert.assertTrue(semaphore.tryAcquire(3, 10, TimeUnit.SECONDS)); -@@ -451,30 +386,25 @@ public class TestPathChildrenCache extends BaseClassForTests - client.create().forPath("/base/a"); - Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS)); - -- List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList -- ( -+ List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList( - PathChildrenCacheEvent.Type.CHILD_ADDED, - PathChildrenCacheEvent.Type.CHILD_ADDED, - PathChildrenCacheEvent.Type.CHILD_ADDED, - PathChildrenCacheEvent.Type.CHILD_REMOVED, -- PathChildrenCacheEvent.Type.CHILD_ADDED -- ); -+ PathChildrenCacheEvent.Type.CHILD_ADDED); - Assert.assertEquals(expected, events); -- } -- finally -- { -+ } finally { - client.close(); - } - } - - // test Issue 27 using new rebuild() method - @Test -- public void testIssue27Alt() throws Exception -- { -- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); -+ public void testIssue27Alt() throws Exception { -+ CuratorFramework client = CuratorFrameworkFactory.newClient( -+ server.getConnectString(), new RetryOneTime(1)); - client.start(); -- try -- { -+ try { - client.create().forPath("/base"); - client.create().forPath("/base/a"); - client.create().forPath("/base/b"); -@@ -485,18 +415,15 @@ public class TestPathChildrenCache extends BaseClassForTests - final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList(); - final Semaphore semaphore = new Semaphore(0); - PathChildrenCache cache = new PathChildrenCache(client, "/base", true); -- cache.getListenable().addListener -- ( -- new PathChildrenCacheListener() -- { -- @Override -- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception -- { -- events.add(event.getType()); -- semaphore.release(); -- } -- } -- ); -+ cache.getListenable().addListener(new PathChildrenCacheListener() { -+ -+ @Override -+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) -+ throws Exception { -+ events.add(event.getType()); -+ semaphore.release(); -+ } -+ }); - cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - - client.delete().forPath("/base/a"); -@@ -505,27 +432,23 @@ public class TestPathChildrenCache extends BaseClassForTests - client.create().forPath("/base/a"); - Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS)); - -- List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList -- ( -+ List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList( - PathChildrenCacheEvent.Type.CHILD_REMOVED, -- PathChildrenCacheEvent.Type.CHILD_ADDED -- ); -+ PathChildrenCacheEvent.Type.CHILD_ADDED); - Assert.assertEquals(expected, events); -- } -- finally -- { -+ } finally { - client.close(); - } - } - - @Test -- public void testKilledSession() throws Exception -- { -+ public void testKilledSession() throws Exception { - Timing timing = new Timing(); - CuratorFramework client = null; -- try -- { -- client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); -+ try { -+ client = CuratorFrameworkFactory.newClient( -+ server.getConnectString(), timing.session(), timing.connection(), -+ new RetryOneTime(1)); - client.start(); - client.create().forPath("/test"); - -@@ -536,32 +459,22 @@ public class TestPathChildrenCache extends BaseClassForTests - final CountDownLatch lostLatch = new CountDownLatch(1); - final CountDownLatch reconnectedLatch = new CountDownLatch(1); - final CountDownLatch removedLatch = new CountDownLatch(1); -- cache.getListenable().addListener -- ( -- new PathChildrenCacheListener() -- { -- @Override -- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception -- { -- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) -- { -- childAddedLatch.countDown(); -- } -- else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST ) -- { -- lostLatch.countDown(); -- } -- else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED ) -- { -- reconnectedLatch.countDown(); -- } -- else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ) -- { -- removedLatch.countDown(); -- } -- } -+ cache.getListenable().addListener(new PathChildrenCacheListener() { -+ -+ @Override -+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) -+ throws Exception { -+ if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { -+ childAddedLatch.countDown(); -+ } else if (event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST) { -+ lostLatch.countDown(); -+ } else if (event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED) { -+ reconnectedLatch.countDown(); -+ } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) { -+ removedLatch.countDown(); - } -- ); -+ } -+ }); - - client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes()); - Assert.assertTrue(timing.awaitLatch(childAddedLatch)); -@@ -570,54 +483,48 @@ public class TestPathChildrenCache extends BaseClassForTests - Assert.assertTrue(timing.awaitLatch(lostLatch)); - Assert.assertTrue(timing.awaitLatch(reconne <TRUNCATED>