Repository: curator Updated Branches: refs/heads/CURATOR-397 301e98926 -> e0a27daef
Make ModeledNodeCache use the same listener as the others Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e0a27dae Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e0a27dae Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e0a27dae Branch: refs/heads/CURATOR-397 Commit: e0a27daef0b4dd363171b9ffde6eb4230cd8efd6 Parents: 301e989 Author: randgalt <randg...@apache.org> Authored: Sun Apr 9 07:03:50 2017 -0500 Committer: randgalt <randg...@apache.org> Committed: Sun Apr 9 07:03:50 2017 -0500 ---------------------------------------------------------------------- .../framework/recipes/cache/NodeCache.java | 5 + .../details/recipes/ModeledNodeCacheImpl.java | 114 ++++++++++++++++++- .../recipes/ModeledPathChildrenCacheImpl.java | 2 +- .../details/recipes/ModeledTreeCacheImpl.java | 2 +- .../async/modeled/recipes/ModeledNodeCache.java | 10 +- .../modeled/recipes/TestModeledCaches.java | 36 +++--- 6 files changed, 144 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java index 3a490ca..9687e1b 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java @@ -145,6 +145,11 @@ public class NodeCache implements Closeable this.dataIsCompressed = dataIsCompressed; } + public CuratorFramework getClient() + { + return client; + } + /** * Start the cache. The cache is not started automatically. You must call this method. * http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledNodeCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledNodeCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledNodeCacheImpl.java index 4509c73..e7bfb7c 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledNodeCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledNodeCacheImpl.java @@ -18,24 +18,35 @@ */ package org.apache.curator.x.async.modeled.details.recipes; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.async.modeled.ModelSerializer; import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.modeled.recipes.ModeledCacheEvent; +import org.apache.curator.x.async.modeled.recipes.ModeledCacheEventType; +import org.apache.curator.x.async.modeled.recipes.ModeledCacheListener; import org.apache.curator.x.async.modeled.recipes.ModeledCachedNode; import org.apache.curator.x.async.modeled.recipes.ModeledNodeCache; import org.apache.zookeeper.data.Stat; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; -public class ModeledNodeCacheImpl<T> implements ModeledNodeCache<T> +public class ModeledNodeCacheImpl<T> implements ModeledNodeCache<T>, ConnectionStateListener { private final NodeCache cache; private final ModelSerializer<T> serializer; private final ZPath path; + private final Map<ModeledCacheListener<T>, NodeCacheListener> listenerMap = new ConcurrentHashMap<>(); public ModeledNodeCacheImpl(NodeCache cache, ModelSerializer<T> serializer) { @@ -45,6 +56,58 @@ public class ModeledNodeCacheImpl<T> implements ModeledNodeCache<T> } @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + ModeledCacheEventType mappedType; + switch ( newState ) + { + default: + { + mappedType = null; + break; + } + + case RECONNECTED: + case CONNECTED: + { + mappedType = ModeledCacheEventType.CONNECTION_RECONNECTED; + break; + } + + case SUSPENDED: + { + mappedType = ModeledCacheEventType.CONNECTION_SUSPENDED; + break; + } + + case LOST: + { + mappedType = ModeledCacheEventType.CONNECTION_LOST; + break; + } + } + + if ( mappedType != null ) + { + ModeledCacheEvent<T> event = new ModeledCacheEvent<T>() + { + @Override + public ModeledCacheEventType getType() + { + return mappedType; + } + + @Override + public Optional<ModeledCachedNode<T>> getNode() + { + return Optional.empty(); + } + }; + listenerMap.keySet().forEach(l -> l.event(null)); + } + } + + @Override public NodeCache unwrap() { return cache; @@ -61,11 +124,13 @@ public class ModeledNodeCacheImpl<T> implements ModeledNodeCache<T> { throw new RuntimeException("Could not start", e); } + cache.getClient().getConnectionStateListenable().addListener(this); } @Override public void start(boolean buildInitial) { + cache.getClient().getConnectionStateListenable().removeListener(this); try { cache.start(buildInitial); @@ -90,9 +155,52 @@ public class ModeledNodeCacheImpl<T> implements ModeledNodeCache<T> } @Override - public Listenable<NodeCacheListener> getListenable() + public Listenable<ModeledCacheListener<T>> getListenable() { - return cache.getListenable(); + return new Listenable<ModeledCacheListener<T>>() + { + @Override + public void addListener(ModeledCacheListener<T> listener) + { + addListener(listener, MoreExecutors.sameThreadExecutor()); + } + + @Override + public void addListener(ModeledCacheListener<T> listener, Executor executor) + { + NodeCacheListener nodeCacheListener = () -> + { + Optional<ModeledCachedNode<T>> currentData = getCurrentData(); + ModeledCacheEvent<T> event = new ModeledCacheEvent<T>() + { + @Override + public ModeledCacheEventType getType() + { + return currentData.isPresent() ? ModeledCacheEventType.NODE_UPDATED : ModeledCacheEventType.NODE_REMOVED; + } + + @Override + public Optional<ModeledCachedNode<T>> getNode() + { + return currentData; + } + }; + listener.event(event); + }; + listenerMap.put(listener, nodeCacheListener); + cache.getListenable().addListener(nodeCacheListener, executor); + } + + @Override + public void removeListener(ModeledCacheListener<T> listener) + { + NodeCacheListener nodeCacheListener = listenerMap.remove(listener); + if ( nodeCacheListener != null ) + { + cache.getListenable().removeListener(nodeCacheListener); + } + } + }; } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java index a6777a6..ac7ebbb 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java @@ -44,7 +44,7 @@ import java.util.stream.Collectors; public class ModeledPathChildrenCacheImpl<T> implements ModeledPathChildrenCache<T> { private final PathChildrenCache cache; - private final Map<ModeledCacheListener, PathChildrenCacheListener> listenerMap = new ConcurrentHashMap<>(); + private final Map<ModeledCacheListener<T>, PathChildrenCacheListener> listenerMap = new ConcurrentHashMap<>(); private final ModelSerializer<T> serializer; public ModeledPathChildrenCacheImpl(PathChildrenCache cache, ModelSerializer<T> serializer) http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java index d2b2dab..f5d4c1e 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java @@ -47,7 +47,7 @@ import static org.apache.curator.x.async.modeled.details.recipes.ModeledPathChil public class ModeledTreeCacheImpl<T> implements ModeledTreeCache<T> { private final TreeCache cache; - private final Map<ModeledCacheListener, TreeCacheListener> listenerMap = new ConcurrentHashMap<>(); + private final Map<ModeledCacheListener<T>, TreeCacheListener> listenerMap = new ConcurrentHashMap<>(); private final ModelSerializer<T> serializer; public ModeledTreeCacheImpl(TreeCache cache, ModelSerializer<T> serializer) http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledNodeCache.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledNodeCache.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledNodeCache.java index 97df1d6..a5fb598 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledNodeCache.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledNodeCache.java @@ -20,7 +20,6 @@ package org.apache.curator.x.async.modeled.recipes; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.recipes.cache.NodeCache; -import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.x.async.modeled.ModelSerializer; import org.apache.curator.x.async.modeled.details.recipes.ModeledNodeCacheImpl; import java.io.Closeable; @@ -67,9 +66,14 @@ public interface ModeledNodeCache<T> extends Closeable void rebuild(); /** - * Forwards to {@link org.apache.curator.framework.recipes.cache.NodeCache#getListenable()} + * Return the listener container so that you can add/remove listeners. Note: + * {@link org.apache.curator.x.async.modeled.recipes.ModeledCacheEventType#INITIALIZED} + * and {@link org.apache.curator.x.async.modeled.recipes.ModeledCacheEventType#NODE_ADDED} are not + * used. + * + * @return listener container */ - Listenable<NodeCacheListener> getListenable(); + Listenable<ModeledCacheListener<T>> getListenable(); /** * Return the modeled current data. There are no guarantees of accuracy. This is http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java index bacd516..ce65f9c 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java @@ -38,7 +38,6 @@ import org.testng.annotations.Test; import java.math.BigInteger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; public class TestModeledCaches extends CompletableBaseClassForTests @@ -79,34 +78,37 @@ public class TestModeledCaches extends CompletableBaseClassForTests { cache.start(true); - Semaphore changesAvailable = new Semaphore(0); - cache.getListenable().addListener(changesAvailable::release); + BlockingQueue<ModeledCacheEvent<TestModel>> events = new LinkedBlockingQueue<>(); + ModeledCacheListener<TestModel> listener = events::add; + cache.getListenable().addListener(listener); TestModel model1 = new TestModel("a", "b", "c", 1, BigInteger.TEN); TestModel model2 = new TestModel("d", "e", "f", 10, BigInteger.ONE); Stat stat = new Stat(); modeled.create(model1, stat); - Assert.assertTrue(timing.acquireSemaphore(changesAvailable)); - Assert.assertTrue(cache.getCurrentData().isPresent()); - Assert.assertTrue(cache.getCurrentData().get().getData().isPresent()); - Assert.assertEquals(cache.getCurrentData().get().getPath(), path); - Assert.assertEquals(cache.getCurrentData().get().getData().get(), model1); - Assert.assertEquals(cache.getCurrentData().get().getStat(), stat); + ModeledCacheEvent<TestModel> event = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); + Assert.assertNotNull(event); + Assert.assertEquals(event.getType(), ModeledCacheEventType.NODE_UPDATED); + Assert.assertTrue(event.getNode().isPresent()); + Assert.assertTrue(event.getNode().get().getData().isPresent()); + Assert.assertEquals(event.getNode().get().getPath(), path); + Assert.assertEquals(event.getNode().get().getData().get(), model1); + Assert.assertEquals(event.getNode().get().getStat(), stat); timing.sleepABit(); - Assert.assertEquals(changesAvailable.availablePermits(), 0); + Assert.assertEquals(events.size(), 0); modeled.update(model2); - Assert.assertTrue(timing.acquireSemaphore(changesAvailable)); - Assert.assertTrue(cache.getCurrentData().isPresent()); - Assert.assertTrue(cache.getCurrentData().get().getData().isPresent()); - Assert.assertEquals(cache.getCurrentData().get().getPath(), path); - Assert.assertEquals(cache.getCurrentData().get().getData().get(), model2); + event = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); + Assert.assertTrue(event.getNode().isPresent()); + Assert.assertTrue(event.getNode().get().getData().isPresent()); + Assert.assertEquals(event.getNode().get().getPath(), path); + Assert.assertEquals(event.getNode().get().getData().get(), model2); modeled.delete(); - Assert.assertTrue(timing.acquireSemaphore(changesAvailable)); - Assert.assertFalse(cache.getCurrentData().isPresent()); + event = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); + Assert.assertFalse(event.getNode().isPresent()); } }