continued work on porting old PathChildrenCache tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bf73f0d3 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bf73f0d3 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bf73f0d3 Branch: refs/heads/persistent-watch Commit: bf73f0d3999bfc21b1799ce0c9d3e06214479206 Parents: 01652ce Author: randgalt <randg...@apache.org> Authored: Fri Dec 30 12:03:41 2016 -0500 Committer: randgalt <randg...@apache.org> Committed: Fri Dec 30 12:03:41 2016 -0500 ---------------------------------------------------------------------- .../framework/recipes/watch/CuratorCache.java | 27 +- .../recipes/watch/CuratorCacheBase.java | 57 +- .../recipes/watch/InternalCuratorCache.java | 79 +- .../recipes/watch/InternalNodeCache.java | 53 +- .../recipes/watch/PersistentWatcher.java | 23 +- .../framework/recipes/watch/Refresher.java | 18 +- .../cache/TestSingleLevelCuratorCache.java | 996 ------------------- .../watch/TestSingleLevelCuratorCache.java | 961 ++++++++++++++++++ 8 files changed, 1111 insertions(+), 1103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java index bc05270..c8a6ea2 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java @@ -24,7 +24,7 @@ import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; +import java.util.concurrent.CountDownLatch; /** * General interface for client-cached nodes. Create instances @@ -34,8 +34,10 @@ public interface CuratorCache extends Closeable { /** * Start the cache + * + * @return a latch that can be used to block until the initial refresh has completed */ - void start(); + CountDownLatch start(); @Override void close(); @@ -48,20 +50,21 @@ public interface CuratorCache extends Closeable Listenable<CacheListener> getListenable(); /** - * force-fill the cache by getting all applicable nodes. The returned future + * force-fill the cache by getting all applicable nodes. The returned latch * can be used to check/block for completion. * - * @return a future that signals when the refresh is complete + * @return a latch that signals when the refresh is complete */ - Future<Boolean> refreshAll(); + CountDownLatch refreshAll(); /** - * Refresh the given cached node + * Refresh the given cached node The returned latch + * can be used to check/block for completion. * * @param path node full path - * @return a future that signals when the refresh is complete + * @return a latch that signals when the refresh is complete */ - Future<Boolean> refresh(String path); + CountDownLatch refresh(String path); /** * Remove the given path from the cache. @@ -149,4 +152,12 @@ public interface CuratorCache extends Closeable * @return true if the data was cleared */ boolean clearDataBytes(String path, int ifVersion); + + /** + * Returns the number of times this cache has been refreshed (manually via one of the refresh() + * methods, from starting, from connection problems, etc.). + * + * @return number of refreshes + */ + long refreshCount(); } http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java index 4f4427a..fcab38c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.recipes.watch; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.ListenerContainer; @@ -26,16 +27,25 @@ import java.util.Collection; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; abstract class CuratorCacheBase implements CuratorCache { protected final Cache<String, CachedNode> cache; - protected final ListenerContainer<CacheListener> listeners = new ListenerContainer<>(); - protected final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>(); + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final AtomicReference<CountDownLatch> initialRefreshLatch = new AtomicReference<>(new CountDownLatch(1)); private final boolean sendRefreshEvents; + private final AtomicInteger refreshCount = new AtomicInteger(0); - protected enum State + protected boolean isStarted() + { + return state.get() == State.STARTED; + } + + private enum State { LATENT, STARTED, @@ -142,6 +152,47 @@ abstract class CuratorCacheBase implements CuratorCache return false; } + @Override + public long refreshCount() + { + return refreshCount.get(); + } + + @Override + public final CountDownLatch start() + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started"); + + internalStart(); + return initialRefreshLatch.get(); + } + + @Override + public final void close() + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + internalClose(); + listeners.clear(); + cache.invalidateAll(); + cache.cleanUp(); + } + } + + protected abstract void internalClose(); + + protected abstract void internalStart(); + + void incrementRefreshCount() + { + refreshCount.incrementAndGet(); + CountDownLatch latch = initialRefreshLatch.getAndSet(null); + if ( latch != null ) + { + latch.countDown(); + } + } + void notifyListeners(final CacheEvent eventType, final String path) { if ( state.get() != State.STARTED ) http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java index 14db725..ca83e9c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java @@ -18,22 +18,21 @@ */ package org.apache.curator.framework.recipes.watch; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.Objects; -import java.util.concurrent.Future; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; import java.util.concurrent.atomic.AtomicInteger; class InternalCuratorCache extends CuratorCacheBase implements Watcher @@ -43,7 +42,6 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher private final String basePath; private final CacheFilter cacheFilter; private final RefreshFilter refreshFilter; - private final boolean refreshOnStart; private static final CachedNode nullNode = new CachedNode(); private static final RefreshFilter nopRefreshFilter = new RefreshFilter() { @@ -53,52 +51,38 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher return false; } }; - private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() - { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if ( newState.isConnected() ) - { - internalRefresh(basePath, new Refresher(InternalCuratorCache.this, basePath), refreshFilter); - } - } - }; - InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, RefreshFilter refreshFilter, Cache<String, CachedNode> cache, boolean sendRefreshEvents, boolean refreshOnStart) + InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, final RefreshFilter refreshFilter, Cache<String, CachedNode> cache, boolean sendRefreshEvents, final boolean refreshOnStart) { super(cache, sendRefreshEvents); this.client = Objects.requireNonNull(client, "client cannot be null"); basePath = Objects.requireNonNull(path, "path cannot be null"); this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null"); this.refreshFilter = Objects.requireNonNull(refreshFilter, "primingFilter cannot be null"); - this.refreshOnStart = refreshOnStart; - watcher = new PersistentWatcher(client, path); + watcher = new PersistentWatcher(client, path) + { + @Override + protected void noteWatcherReset() + { + if ( refreshOnStart || (refreshCount() > 0) ) + { + internalRefresh(basePath, new Refresher(InternalCuratorCache.this, basePath), refreshFilter); + } + } + }; watcher.getListenable().addListener(this); } @Override - public void start() + protected void internalStart() { - Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started"); watcher.start(); - client.getConnectionStateListenable().addListener(connectionStateListener); - if ( refreshOnStart ) - { - internalRefresh(basePath, new Refresher(InternalCuratorCache.this, basePath), refreshFilter); - } } @Override - public void close() + protected void internalClose() { - if ( state.compareAndSet(State.STARTED, State.CLOSED) ) - { - client.getConnectionStateListenable().removeListener(connectionStateListener); - watcher.getListenable().removeListener(this); - listeners.clear(); - watcher.close(); - } + watcher.close(); } @Override @@ -131,29 +115,32 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher } @Override - public Future<Boolean> refreshAll() + public CountDownLatch refreshAll() { return refresh(basePath); } @Override - public Future<Boolean> refresh(String path) + public CountDownLatch refresh(String path) { Preconditions.checkArgument(path.startsWith(basePath), "Path is not this cache's tree: " + path); - if ( state.get() == State.STARTED ) + if ( isStarted() ) { - SettableFuture<Boolean> task = SettableFuture.create(); - Refresher refresher = new Refresher(this, path, task); + CountDownLatch latch = new CountDownLatch(1); + Refresher refresher = new Refresher(this, path, latch); internalRefresh(path, refresher, refreshFilter); - return task; + return latch; } - return Futures.immediateFuture(true); + return new CountDownLatch(0); } + @VisibleForTesting + volatile Exchanger<Object> rebuildTestExchanger; + private void internalRefresh(final String path, final Refresher refresher, final RefreshFilter refreshFilter) { - if ( state.get() != State.STARTED ) + if ( !isStarted() ) { return; } @@ -191,6 +178,10 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher // TODO } refresher.decrement(); + if ( rebuildTestExchanger != null ) + { + rebuildTestExchanger.exchange(new Object()); + } } }; @@ -208,6 +199,10 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher { notifyListeners(CacheEvent.NODE_CREATED, path); } + else + { + notifyListeners(CacheEvent.NODE_CHANGED, path); + } break; } http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java index c2624b7..3860679 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java @@ -19,7 +19,6 @@ package org.apache.curator.framework.recipes.watch; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import org.apache.curator.framework.CuratorFramework; @@ -36,8 +35,8 @@ import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -104,33 +103,27 @@ class InternalNodeCache extends CuratorCacheBase } @Override - public void start() + protected void internalStart() { - Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started"); - client.getConnectionStateListenable().addListener(connectionStateListener); refreshAll(); } @Override - public void close() + protected void internalClose() { - if ( state.compareAndSet(State.STARTED, State.CLOSED) ) - { - client.removeWatchers(); - listeners.clear(); - client.getConnectionStateListenable().removeListener(connectionStateListener); - } + client.removeWatchers(); + client.getConnectionStateListenable().removeListener(connectionStateListener); } @Override - public Future<Boolean> refreshAll() + public CountDownLatch refreshAll() { return null; // TODO } @Override - public Future<Boolean> refresh(String path) + public CountDownLatch refresh(String path) { return null; // TODO } @@ -156,7 +149,7 @@ class InternalNodeCache extends CuratorCacheBase private void reset(Refresher refresher) throws Exception { - if ( (state.get() == State.STARTED) && isConnected.get() ) + if ( isStarted() && isConnected.get() ) { refresher.increment(); client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback, refresher).forPath(path); @@ -229,15 +222,15 @@ class InternalNodeCache extends CuratorCacheBase CachedNode previousData = data.getAndSet(newData); if ( newData == null ) { - notifyListeners(CacheEvent.NODE_DELETED); + notifyListeners(CacheEvent.NODE_DELETED, path); } else if ( previousData == null ) { - notifyListeners(CacheEvent.NODE_CREATED); + notifyListeners(CacheEvent.NODE_CREATED, path); } else if ( !previousData.equals(newData) ) { - notifyListeners(CacheEvent.NODE_CHANGED); + notifyListeners(CacheEvent.NODE_CHANGED, path); } if ( rebuildTestExchanger != null ) @@ -252,28 +245,4 @@ class InternalNodeCache extends CuratorCacheBase } } } - - private void notifyListeners(final CacheEvent event) - { - listeners.forEach - ( - new Function<CacheListener, Void>() - { - @Override - public Void apply(CacheListener listener) - { - try - { - listener.process(event, path); - } - catch ( Exception e ) - { - ThreadUtils.checkInterrupted(e); - log.error("Calling listener", e); - } - return null; - } - } - ); - } } http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java index bc5ae7e..9bee7b1 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -21,10 +21,14 @@ package org.apache.curator.framework.recipes.watch; import com.google.common.base.Function; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.io.Closeable; @@ -65,6 +69,17 @@ public class PersistentWatcher implements Closeable }; private final CuratorFramework client; private final String basePath; + private final BackgroundCallback backgroundCallback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + if ( (event.getType() == CuratorEventType.ADD_PERSISTENT_WATCH) && (event.getResultCode() == KeeperException.Code.OK.intValue()) ) + { + noteWatcherReset(); + } + } + }; private enum State { @@ -100,6 +115,7 @@ public class PersistentWatcher implements Closeable { // TODO } + listeners.clear(); } } @@ -108,11 +124,16 @@ public class PersistentWatcher implements Closeable return listeners; } + protected void noteWatcherReset() + { + // provided for sub-classes to override + } + private void reset() { try { - client.addPersistentWatch().inBackground().usingWatcher(watcher).forPath(basePath); + client.addPersistentWatch().inBackground(backgroundCallback).usingWatcher(watcher).forPath(basePath); } catch ( Exception e ) { http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java index 7169c01..e81b475 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java @@ -18,14 +18,14 @@ */ package org.apache.curator.framework.recipes.watch; -import com.google.common.util.concurrent.SettableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; class Refresher { private final CuratorCacheBase cacheBase; private final String refreshPath; - private final SettableFuture<Boolean> task; + private final CountDownLatch latch; private final AtomicInteger count = new AtomicInteger(0); public Refresher(CuratorCacheBase cacheBase, String refreshPath) @@ -33,12 +33,12 @@ class Refresher this(cacheBase, refreshPath, null); } - Refresher(CuratorCacheBase cacheBase, String refreshPath, SettableFuture<Boolean> task) + Refresher(CuratorCacheBase cacheBase, String refreshPath, CountDownLatch latch) { this.cacheBase = cacheBase; this.refreshPath = refreshPath; - this.task = task; + this.latch = latch; } void increment() @@ -51,15 +51,11 @@ class Refresher if ( count.decrementAndGet() <= 0 ) { cacheBase.notifyListeners(CacheEvent.CACHE_REFRESHED, refreshPath); - if ( task != null ) + if ( latch != null ) { - task.set(true); + latch.countDown(); } + cacheBase.incrementRefreshCount(); } } - - boolean isCancelled() - { - return (task != null) && task.isCancelled(); - } } http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java deleted file mode 100644 index 6bf0e9e..0000000 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java +++ /dev/null @@ -1,996 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.curator.framework.recipes.cache; - -import com.google.common.collect.Lists; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.UnhandledErrorListener; -import org.apache.curator.framework.imps.TestCleanState; -import org.apache.curator.framework.recipes.watch.CacheEvent; -import org.apache.curator.framework.recipes.watch.CacheListener; -import org.apache.curator.framework.recipes.watch.CuratorCache; -import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.retry.RetryOneTime; -import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.test.ExecuteCalledWatchingExecutorService; -import org.apache.curator.test.KillSession; -import org.apache.curator.test.TestingServer; -import org.apache.curator.test.Timing; -import org.apache.curator.utils.CloseableUtils; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.testng.Assert; -import org.testng.annotations.Test; -import java.util.List; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.testng.AssertJUnit.assertNotNull; - -public class TestSingleLevelCuratorCache extends BaseClassForTests -{ - private static final Timing timing = new Timing(); - - @Test - public void testWithBadConnect() throws Exception - { - final int serverPort = server.getPort(); - server.close(); - - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 1000, 1000, new RetryOneTime(1)); - try - { - client.start(); - - final CuratorCache cache = CuratorCacheBuilder.builder(client, "/").forSingleLevel().build(); - final CountDownLatch addedLatch = new CountDownLatch(1); - CacheListener listener = new CacheListener() - { - @Override - public void process(CacheEvent event, String path) - { - if ( (event == CacheEvent.NODE_CREATED) && path.equals("/baz")) - { - addedLatch.countDown(); - } - } - }; - cache.getListenable().addListener(listener); - cache.start(); - - final CountDownLatch connectedLatch = new CountDownLatch(1); - client.getConnectionStateListenable().addListener(new ConnectionStateListener() - { - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if(newState == ConnectionState.CONNECTED) - { - connectedLatch.countDown(); - } - } - }); - - server = new TestingServer(serverPort, true); - - Assert.assertTrue(timing.awaitLatch(connectedLatch)); - - client.create().creatingParentContainersIfNeeded().forPath("/baz", new byte[]{1, 2, 3}); - - assertNotNull("/baz does not exist", client.checkExists().forPath("/baz")); - - Assert.assertTrue(timing.awaitLatch(addedLatch)); - - assertNotNull("cache doesn't see /baz", cache.get("/baz").getData()); - } - finally - { - CloseableUtils.closeQuietly(client); - } - } - - @Test - public void testPostInitializedForEmpty() throws Exception - { - CuratorCache cache = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - try - { - client.start(); - - final CountDownLatch latch = new CountDownLatch(1); - cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build(); - cache.getListenable().addListener(new CacheListener() - { - @Override - public void process(CacheEvent event, String path) - { - if ( event == CacheEvent.CACHE_REFRESHED ) - { - latch.countDown(); - } - } - }); - cache.start(); - Assert.assertTrue(timing.awaitLatch(latch)); - } - finally - { - CloseableUtils.closeQuietly(cache); - TestCleanState.closeAndTestClean(client); - } - } - - @Test - public void testAsyncInitialPopulation() throws Exception - { - CuratorCache cache = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - try - { - client.start(); - - client.create().forPath("/test"); - client.create().forPath("/test/one", "hey there".getBytes()); - - final BlockingQueue<CacheEvent> events = new LinkedBlockingQueue<>(); - cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().sendingRefreshEvents(false).build(); - cache.getListenable().addListener(new CacheListener() - { - @Override - public void process(CacheEvent event, String path) - { - events.offer(event); - } - }); - cache.start(); - Future<Boolean> task = cache.refreshAll(); - - CacheEvent event = events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS); - Assert.assertEquals(event, CacheEvent.NODE_CREATED); - - Assert.assertNotNull(task.get(timing.forWaiting().seconds(), TimeUnit.SECONDS)); - } - finally - { - CloseableUtils.closeQuietly(cache); - TestCleanState.closeAndTestClean(client); - } - } - - @Test - public void testChildrenInitialized() throws Exception - { - CuratorCache cache = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - try - { - client.start(); - client.create().forPath("/test"); - - cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build(); - - final CountDownLatch addedLatch = new CountDownLatch(3); - final CountDownLatch initLatch = new CountDownLatch(1); - cache.getListenable().addListener(new CacheListener() - { - @Override - public void process(CacheEvent event, String path) - { - if ( event == CacheEvent.NODE_CREATED ) - { - addedLatch.countDown(); - } - else if ( event == CacheEvent.CACHE_REFRESHED ) - { - initLatch.countDown(); - } - } - }); - - client.create().forPath("/test/1", "1".getBytes()); - client.create().forPath("/test/2", "2".getBytes()); - client.create().forPath("/test/3", "3".getBytes()); - - cache.start(); - - Assert.assertTrue(timing.awaitLatch(addedLatch)); - Assert.assertTrue(timing.awaitLatch(initLatch)); - Assert.assertEquals(cache.size(), 3); - Assert.assertEquals(cache.get("/test/1").getData(), "1".getBytes()); - Assert.assertEquals(cache.get("/test/2").getData(), "2".getBytes()); - Assert.assertEquals(cache.get("/test/3").getData(), "3".getBytes()); - } - finally - { - CloseableUtils.closeQuietly(cache); - TestCleanState.closeAndTestClean(client); - } - } - - @Test - public void testChildrenInitializedNormal() throws Exception - { - CuratorCache cache = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - try - { - client.start(); - client.create().forPath("/test"); - - cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().sendingRefreshEvents(false).build(); - - final CountDownLatch addedLatch = new CountDownLatch(3); - cache.getListenable().addListener(new CacheListener() - { - @Override - public void process(CacheEvent event, String path) - { - Assert.assertNotEquals(event, CacheEvent.CACHE_REFRESHED); - if ( event == CacheEvent.NODE_CREATED ) - { - addedLatch.countDown(); - } - } - }); - - client.create().forPath("/test/1", "1".getBytes()); - client.create().forPath("/test/2", "2".getBytes()); - client.create().forPath("/test/3", "3".getBytes()); - - cache.start(); - - Assert.assertTrue(timing.awaitLatch(addedLatch)); - Assert.assertEquals(cache.size(), 3); - Assert.assertEquals(cache.get("/test/1").getData(), "1".getBytes()); - Assert.assertEquals(cache.get("/test/2").getData(), "2".getBytes()); - Assert.assertEquals(cache.get("/test/3").getData(), "3".getBytes()); - } - finally - { - CloseableUtils.closeQuietly(cache); - TestCleanState.closeAndTestClean(client); - } - } - - //@Test - public void testUpdateWhenNotCachingData() throws Exception - { - PathChildrenCache cache = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); - try - { - final CountDownLatch updatedLatch = new CountDownLatch(1); - final CountDownLatch addedLatch = new CountDownLatch(1); - client.create().creatingParentsIfNeeded().forPath("/test"); - cache = new PathChildrenCache(client, "/test", false); - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED ) - { - updatedLatch.countDown(); - } - else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) - { - addedLatch.countDown(); - } - } - } - ); - cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - - client.create().forPath("/test/foo", "first".getBytes()); - Assert.assertTrue(timing.awaitLatch(addedLatch)); - - client.setData().forPath("/test/foo", "something new".getBytes()); - Assert.assertTrue(timing.awaitLatch(updatedLatch)); - } - finally - { - CloseableUtils.closeQuietly(cache); - TestCleanState.closeAndTestClean(client); - } - } - - //@Test - public void testEnsurePath() throws Exception - { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); - try - { - try ( PathChildrenCache cache = new PathChildrenCache(client, "/one/two/three", false) ) - { - cache.start(); - timing.sleepABit(); - - try - { - client.create().forPath("/one/two/three/four"); - } - catch ( KeeperException.NoNodeException e ) - { - Assert.fail("Path should exist", e); - } - } - timing.sleepABit(); - } - finally - { - TestCleanState.closeAndTestClean(client); - } - } - - //@Test - public void testDeleteThenCreate() throws Exception - { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); - try - { - client.create().forPath("/test"); - client.create().forPath("/test/foo", "one".getBytes()); - - final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); - client.getUnhandledErrorListenable().addListener - ( - new UnhandledErrorListener() - { - @Override - public void unhandledError(String message, Throwable e) - { - error.set(e); - } - } - ); - - final CountDownLatch removedLatch = new CountDownLatch(1); - final CountDownLatch postRemovedLatch = new CountDownLatch(1); - final CountDownLatch dataLatch = new CountDownLatch(1); - try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true) ) - { - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ) - { - removedLatch.countDown(); - Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS)); - } - else - { - try - { - Assert.assertEquals(event.getData().getData(), "two".getBytes()); - } - finally - { - dataLatch.countDown(); - } - } - } - } - ); - cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - - client.delete().forPath("/test/foo"); - Assert.assertTrue(timing.awaitLatch(removedLatch)); - client.create().forPath("/test/foo", "two".getBytes()); - postRemovedLatch.countDown(); - Assert.assertTrue(timing.awaitLatch(dataLatch)); - - Throwable t = error.get(); - if ( t != null ) - { - Assert.fail("Assert", t); - } - } - } - finally - { - TestCleanState.closeAndTestClean(client); - } - } - - //@Test - public void testRebuildAgainstOtherProcesses() throws Exception - { - final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); - try - { - client.create().forPath("/test"); - client.create().forPath("/test/foo"); - client.create().forPath("/test/bar"); - client.create().forPath("/test/snafu", "original".getBytes()); - - final CountDownLatch addedLatch = new CountDownLatch(2); - try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", true) ) - { - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) - { - if ( event.getData().getPath().equals("/test/test") ) - { - addedLatch.countDown(); - } - } - else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED ) - { - if ( event.getData().getPath().equals("/test/snafu") ) - { - addedLatch.countDown(); - } - } - } - } - ); - cache.rebuildTestExchanger = new Exchanger<Object>(); - ExecutorService service = Executors.newSingleThreadExecutor(); - final AtomicReference<String> deletedPath = new AtomicReference<String>(); - Future<Object> future = service.submit - ( - new Callable<Object>() - { - @Override - public Object call() throws Exception - { - cache.rebuildTestExchanger.exchange(new Object()); - - // simulate another process adding a node while we're rebuilding - client.create().forPath("/test/test"); - - List<ChildData> currentData = cache.getCurrentData(); - Assert.assertTrue(currentData.size() > 0); - - // simulate another process removing a node while we're rebuilding - client.delete().forPath(currentData.get(0).getPath()); - deletedPath.set(currentData.get(0).getPath()); - - cache.rebuildTestExchanger.exchange(new Object()); - - ChildData childData = null; - while ( childData == null ) - { - childData = cache.getCurrentData("/test/snafu"); - Thread.sleep(1000); - } - Assert.assertEquals(childData.getData(), "original".getBytes()); - client.setData().forPath("/test/snafu", "grilled".getBytes()); - - cache.rebuildTestExchanger.exchange(new Object()); - - return null; - } - } - ); - cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - future.get(); - - Assert.assertTrue(timing.awaitLatch(addedLatch)); - Assert.assertNotNull(cache.getCurrentData("/test/test")); - Assert.assertNull(cache.getCurrentData(deletedPath.get())); - Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes()); - } - } - finally - { - TestCleanState.closeAndTestClean(client); - } - } - - // see https://github.com/Netflix/curator/issues/27 - was caused by not comparing old->new data - //@Test - public void testIssue27() throws Exception - { - PathChildrenCache cache = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); - try - { - client.create().forPath("/base"); - client.create().forPath("/base/a"); - client.create().forPath("/base/b"); - client.create().forPath("/base/c"); - - client.getChildren().forPath("/base"); - - final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList(); - final Semaphore semaphore = new Semaphore(0); - cache = new PathChildrenCache(client, "/base", true); - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - events.add(event.getType()); - semaphore.release(); - } - } - ); - cache.start(); - - Assert.assertTrue(timing.acquireSemaphore(semaphore, 3)); - - client.delete().forPath("/base/a"); - Assert.assertTrue(timing.acquireSemaphore(semaphore, 1)); - - client.create().forPath("/base/a"); - Assert.assertTrue(timing.acquireSemaphore(semaphore, 1)); - - List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList - ( - PathChildrenCacheEvent.Type.CHILD_ADDED, - PathChildrenCacheEvent.Type.CHILD_ADDED, - PathChildrenCacheEvent.Type.CHILD_ADDED, - PathChildrenCacheEvent.Type.CHILD_REMOVED, - PathChildrenCacheEvent.Type.CHILD_ADDED - ); - Assert.assertEquals(expected, events); - } - finally - { - CloseableUtils.closeQuietly(cache); - TestCleanState.closeAndTestClean(client); - } - } - - // test Issue 27 using new rebuild() method - //@Test - public void testIssue27Alt() throws Exception - { - PathChildrenCache cache = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); - try - { - client.create().forPath("/base"); - client.create().forPath("/base/a"); - client.create().forPath("/base/b"); - client.create().forPath("/base/c"); - - client.getChildren().forPath("/base"); - - final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList(); - final Semaphore semaphore = new Semaphore(0); - cache = new PathChildrenCache(client, "/base", true); - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - events.add(event.getType()); - semaphore.release(); - } - } - ); - cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - - client.delete().forPath("/base/a"); - Assert.assertTrue(timing.acquireSemaphore(semaphore, 1)); - - client.create().forPath("/base/a"); - Assert.assertTrue(timing.acquireSemaphore(semaphore, 1)); - - List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList - ( - PathChildrenCacheEvent.Type.CHILD_REMOVED, - PathChildrenCacheEvent.Type.CHILD_ADDED - ); - Assert.assertEquals(expected, events); - } - finally - { - CloseableUtils.closeQuietly(cache); - TestCleanState.closeAndTestClean(client); - } - } - - //@Test - public void testKilledSession() throws Exception - { - PathChildrenCache cache = null; - CuratorFramework client = null; - try - { - client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); - client.create().forPath("/test"); - - cache = new PathChildrenCache(client, "/test", true); - cache.start(); - - final CountDownLatch childAddedLatch = new CountDownLatch(1); - final CountDownLatch lostLatch = new CountDownLatch(1); - final CountDownLatch reconnectedLatch = new CountDownLatch(1); - final CountDownLatch removedLatch = new CountDownLatch(1); - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) - { - childAddedLatch.countDown(); - } - else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST ) - { - lostLatch.countDown(); - } - else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED ) - { - reconnectedLatch.countDown(); - } - else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ) - { - removedLatch.countDown(); - } - } - } - ); - - client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes()); - Assert.assertTrue(timing.awaitLatch(childAddedLatch)); - - KillSession.kill(client.getZookeeperClient().getZooKeeper()); - Assert.assertTrue(timing.awaitLatch(lostLatch)); - Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); - Assert.assertTrue(timing.awaitLatch(removedLatch)); - } - finally - { - CloseableUtils.closeQuietly(cache); - TestCleanState.closeAndTestClean(client); - } - } - - //@Test - public void testModes() throws Exception - { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); - try - { - client.create().forPath("/test"); - - for ( boolean cacheData : new boolean[]{false, true} ) - { - internalTestMode(client, cacheData); - - client.delete().forPath("/test/one"); - client.delete().forPath("/test/two"); - } - } - finally - { - TestCleanState.closeAndTestClean(client); - } - } - - //@Test - public void testRebuildNode() throws Exception - { - PathChildrenCache cache = null; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - try - { - client.start(); - client.create().creatingParentsIfNeeded().forPath("/test/one", "one".getBytes()); - - final CountDownLatch latch = new CountDownLatch(1); - final AtomicInteger counter = new AtomicInteger(); - final Semaphore semaphore = new Semaphore(1); - cache = new PathChildrenCache(client, "/test", true) - { - @Override - void getDataAndStat(String fullPath) throws Exception - { - semaphore.acquire(); - counter.incrementAndGet(); - super.getDataAndStat(fullPath); - latch.countDown(); - } - }; - cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - - Assert.assertTrue(timing.awaitLatch(latch)); - - int saveCounter = counter.get(); - client.setData().forPath("/test/one", "alt".getBytes()); - cache.rebuildNode("/test/one"); - Assert.assertEquals(cache.getCurrentData("/test/one").getData(), "alt".getBytes()); - Assert.assertEquals(saveCounter, counter.get()); - - semaphore.release(1000); - timing.sleepABit(); - } - finally - { - CloseableUtils.closeQuietly(cache); - TestCleanState.closeAndTestClean(client); - } - } - - private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception - { - try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData) ) - { - final CountDownLatch latch = new CountDownLatch(2); - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) - { - latch.countDown(); - } - } - } - ); - cache.start(); - - client.create().forPath("/test/one", "one".getBytes()); - client.create().forPath("/test/two", "two".getBytes()); - Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); - - for ( ChildData data : cache.getCurrentData() ) - { - if ( cacheData ) - { - Assert.assertNotNull(data.getData()); - Assert.assertNotNull(data.getStat()); - } - else - { - Assert.assertNull(data.getData()); - Assert.assertNotNull(data.getStat()); - } - } - } - } - - @Test - public void testBasics() throws Exception - { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); - try - { - client.create().forPath("/test"); - - final BlockingQueue<CacheEvent> events = new LinkedBlockingQueue<>(); - try ( CuratorCache cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build() ) - { - cache.getListenable().addListener - ( - new CacheListener() - { - @Override - public void process(CacheEvent event, String path) - { - if ( path.equals("/test/one") ) - { - events.offer(event); - } - } - } - ); - cache.start(); - - client.create().forPath("/test/one", "hey there".getBytes()); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_CREATED); - - client.setData().forPath("/test/one", "sup!".getBytes()); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_CHANGED); - Assert.assertTrue(cache.exists("/test/one")); - Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!"); - - client.delete().forPath("/test/one"); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_DELETED); - } - } - finally - { - TestCleanState.closeAndTestClean(client); - } - } - - //@Test - public void testBasicsOnTwoCachesWithSameExecutor() throws Exception - { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); - try - { - client.create().forPath("/test"); - - final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>(); - final ExecutorService exec = Executors.newSingleThreadExecutor(); - try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) ) - { - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - if ( event.getData().getPath().equals("/test/one") ) - { - events.offer(event.getType()); - } - } - } - ); - cache.start(); - - final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>(); - try ( PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec) ) - { - cache2.getListenable().addListener( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) - throws Exception - { - if ( event.getData().getPath().equals("/test/one") ) - { - events2.offer(event.getType()); - } - } - } - ); - cache2.start(); - - client.create().forPath("/test/one", "hey there".getBytes()); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); - Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); - - client.setData().forPath("/test/one", "sup!".getBytes()); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); - Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); - Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!"); - Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!"); - - client.delete().forPath("/test/one"); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); - Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); - } - } - } - finally - { - TestCleanState.closeAndTestClean(client); - } - } - - //@Test - public void testDeleteNodeAfterCloseDoesntCallExecutor() - throws Exception - { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); - try - { - client.create().forPath("/test"); - - final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor()); - try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) ) - { - cache.start(); - client.create().forPath("/test/one", "hey there".getBytes()); - - cache.rebuild(); - Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); - Assert.assertTrue(exec.isExecuteCalled()); - - exec.setExecuteCalled(false); - } - Assert.assertFalse(exec.isExecuteCalled()); - - client.delete().forPath("/test/one"); - timing.sleepABit(); - Assert.assertFalse(exec.isExecuteCalled()); - } - finally - { - TestCleanState.closeAndTestClean(client); - } - - } - - /** - * Tests the case where there's an outstanding operation being executed when the cache is - * shut down. See CURATOR-121, this was causing misleading warning messages to be logged. - */ - //@Test - public void testInterruptedOperationOnShutdown() throws Exception - { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new RetryOneTime(1)); - client.start(); - - try - { - final CountDownLatch latch = new CountDownLatch(1); - try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) { - @Override - protected void handleException(Throwable e) - { - latch.countDown(); - } - } ) - { - cache.start(); - - cache.offerOperation(new Operation() - { - - @Override - public void invoke() throws Exception - { - Thread.sleep(5000); - } - }); - - Thread.sleep(1000); - - } - - latch.await(5, TimeUnit.SECONDS); - - Assert.assertTrue(latch.getCount() == 1, "Unexpected exception occurred"); - } - finally - { - TestCleanState.closeAndTestClean(client); - } - } -}