Repository: curator
Updated Branches:
  refs/heads/CURATOR-397 301e98926 -> e0a27daef


Make ModeledNodeCache use the same listener as the others


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e0a27dae
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e0a27dae
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e0a27dae

Branch: refs/heads/CURATOR-397
Commit: e0a27daef0b4dd363171b9ffde6eb4230cd8efd6
Parents: 301e989
Author: randgalt <randg...@apache.org>
Authored: Sun Apr 9 07:03:50 2017 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sun Apr 9 07:03:50 2017 -0500

----------------------------------------------------------------------
 .../framework/recipes/cache/NodeCache.java      |   5 +
 .../details/recipes/ModeledNodeCacheImpl.java   | 114 ++++++++++++++++++-
 .../recipes/ModeledPathChildrenCacheImpl.java   |   2 +-
 .../details/recipes/ModeledTreeCacheImpl.java   |   2 +-
 .../async/modeled/recipes/ModeledNodeCache.java |  10 +-
 .../modeled/recipes/TestModeledCaches.java      |  36 +++---
 6 files changed, 144 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 3a490ca..9687e1b 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -145,6 +145,11 @@ public class NodeCache implements Closeable
         this.dataIsCompressed = dataIsCompressed;
     }
 
+    public CuratorFramework getClient()
+    {
+        return client;
+    }
+
     /**
      * Start the cache. The cache is not started automatically. You must call 
this method.
      *

http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledNodeCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledNodeCacheImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledNodeCacheImpl.java
index 4509c73..e7bfb7c 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledNodeCacheImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledNodeCacheImpl.java
@@ -18,24 +18,35 @@
  */
 package org.apache.curator.x.async.modeled.details.recipes;
 
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.recipes.ModeledCacheEvent;
+import org.apache.curator.x.async.modeled.recipes.ModeledCacheEventType;
+import org.apache.curator.x.async.modeled.recipes.ModeledCacheListener;
 import org.apache.curator.x.async.modeled.recipes.ModeledCachedNode;
 import org.apache.curator.x.async.modeled.recipes.ModeledNodeCache;
 import org.apache.zookeeper.data.Stat;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
-public class ModeledNodeCacheImpl<T> implements ModeledNodeCache<T>
+public class ModeledNodeCacheImpl<T> implements ModeledNodeCache<T>, 
ConnectionStateListener
 {
     private final NodeCache cache;
     private final ModelSerializer<T> serializer;
     private final ZPath path;
+    private final Map<ModeledCacheListener<T>, NodeCacheListener> listenerMap 
= new ConcurrentHashMap<>();
 
     public ModeledNodeCacheImpl(NodeCache cache, ModelSerializer<T> serializer)
     {
@@ -45,6 +56,58 @@ public class ModeledNodeCacheImpl<T> implements 
ModeledNodeCache<T>
     }
 
     @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState)
+    {
+        ModeledCacheEventType mappedType;
+        switch ( newState )
+        {
+            default:
+            {
+                mappedType = null;
+                break;
+            }
+
+            case RECONNECTED:
+            case CONNECTED:
+            {
+                mappedType = ModeledCacheEventType.CONNECTION_RECONNECTED;
+                break;
+            }
+
+            case SUSPENDED:
+            {
+                mappedType = ModeledCacheEventType.CONNECTION_SUSPENDED;
+                break;
+            }
+
+            case LOST:
+            {
+                mappedType = ModeledCacheEventType.CONNECTION_LOST;
+                break;
+            }
+        }
+
+        if ( mappedType != null )
+        {
+            ModeledCacheEvent<T> event = new ModeledCacheEvent<T>()
+            {
+                @Override
+                public ModeledCacheEventType getType()
+                {
+                    return mappedType;
+                }
+
+                @Override
+                public Optional<ModeledCachedNode<T>> getNode()
+                {
+                    return Optional.empty();
+                }
+            };
+            listenerMap.keySet().forEach(l -> l.event(null));
+        }
+    }
+
+    @Override
     public NodeCache unwrap()
     {
         return cache;
@@ -61,11 +124,13 @@ public class ModeledNodeCacheImpl<T> implements 
ModeledNodeCache<T>
         {
             throw new RuntimeException("Could not start", e);
         }
+        cache.getClient().getConnectionStateListenable().addListener(this);
     }
 
     @Override
     public void start(boolean buildInitial)
     {
+        cache.getClient().getConnectionStateListenable().removeListener(this);
         try
         {
             cache.start(buildInitial);
@@ -90,9 +155,52 @@ public class ModeledNodeCacheImpl<T> implements 
ModeledNodeCache<T>
     }
 
     @Override
-    public Listenable<NodeCacheListener> getListenable()
+    public Listenable<ModeledCacheListener<T>> getListenable()
     {
-        return cache.getListenable();
+        return new Listenable<ModeledCacheListener<T>>()
+        {
+            @Override
+            public void addListener(ModeledCacheListener<T> listener)
+            {
+                addListener(listener, MoreExecutors.sameThreadExecutor());
+            }
+
+            @Override
+            public void addListener(ModeledCacheListener<T> listener, Executor 
executor)
+            {
+                NodeCacheListener nodeCacheListener = () ->
+                {
+                    Optional<ModeledCachedNode<T>> currentData = 
getCurrentData();
+                    ModeledCacheEvent<T> event = new ModeledCacheEvent<T>()
+                    {
+                        @Override
+                        public ModeledCacheEventType getType()
+                        {
+                            return currentData.isPresent() ? 
ModeledCacheEventType.NODE_UPDATED : ModeledCacheEventType.NODE_REMOVED;
+                        }
+
+                        @Override
+                        public Optional<ModeledCachedNode<T>> getNode()
+                        {
+                            return currentData;
+                        }
+                    };
+                    listener.event(event);
+                };
+                listenerMap.put(listener, nodeCacheListener);
+                cache.getListenable().addListener(nodeCacheListener, executor);
+            }
+
+            @Override
+            public void removeListener(ModeledCacheListener<T> listener)
+            {
+                NodeCacheListener nodeCacheListener = 
listenerMap.remove(listener);
+                if ( nodeCacheListener != null )
+                {
+                    cache.getListenable().removeListener(nodeCacheListener);
+                }
+            }
+        };
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java
index a6777a6..ac7ebbb 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
 public class ModeledPathChildrenCacheImpl<T> implements 
ModeledPathChildrenCache<T>
 {
     private final PathChildrenCache cache;
-    private final Map<ModeledCacheListener, PathChildrenCacheListener> 
listenerMap = new ConcurrentHashMap<>();
+    private final Map<ModeledCacheListener<T>, PathChildrenCacheListener> 
listenerMap = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
 
     public ModeledPathChildrenCacheImpl(PathChildrenCache cache, 
ModelSerializer<T> serializer)

http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java
index d2b2dab..f5d4c1e 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java
@@ -47,7 +47,7 @@ import static 
org.apache.curator.x.async.modeled.details.recipes.ModeledPathChil
 public class ModeledTreeCacheImpl<T> implements ModeledTreeCache<T>
 {
     private final TreeCache cache;
-    private final Map<ModeledCacheListener, TreeCacheListener> listenerMap = 
new ConcurrentHashMap<>();
+    private final Map<ModeledCacheListener<T>, TreeCacheListener> listenerMap 
= new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
 
     public ModeledTreeCacheImpl(TreeCache cache, ModelSerializer<T> serializer)

http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledNodeCache.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledNodeCache.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledNodeCache.java
index 97df1d6..a5fb598 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledNodeCache.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledNodeCache.java
@@ -20,7 +20,6 @@ package org.apache.curator.x.async.modeled.recipes;
 
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.details.recipes.ModeledNodeCacheImpl;
 import java.io.Closeable;
@@ -67,9 +66,14 @@ public interface ModeledNodeCache<T> extends Closeable
     void rebuild();
 
     /**
-     * Forwards to {@link 
org.apache.curator.framework.recipes.cache.NodeCache#getListenable()}
+     * Return the listener container so that you can add/remove listeners. 
Note:
+     * {@link 
org.apache.curator.x.async.modeled.recipes.ModeledCacheEventType#INITIALIZED}
+     * and {@link 
org.apache.curator.x.async.modeled.recipes.ModeledCacheEventType#NODE_ADDED} 
are not
+     * used.
+     *
+     * @return listener container
      */
-    Listenable<NodeCacheListener> getListenable();
+    Listenable<ModeledCacheListener<T>> getListenable();
 
     /**
      * Return the modeled current data. There are no guarantees of accuracy. 
This is

http://git-wip-us.apache.org/repos/asf/curator/blob/e0a27dae/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java
index bacd516..ce65f9c 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java
@@ -38,7 +38,6 @@ import org.testng.annotations.Test;
 import java.math.BigInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 public class TestModeledCaches extends CompletableBaseClassForTests
@@ -79,34 +78,37 @@ public class TestModeledCaches extends 
CompletableBaseClassForTests
         {
             cache.start(true);
 
-            Semaphore changesAvailable = new Semaphore(0);
-            cache.getListenable().addListener(changesAvailable::release);
+            BlockingQueue<ModeledCacheEvent<TestModel>> events = new 
LinkedBlockingQueue<>();
+            ModeledCacheListener<TestModel> listener = events::add;
+            cache.getListenable().addListener(listener);
 
             TestModel model1 = new TestModel("a", "b", "c", 1, BigInteger.TEN);
             TestModel model2 = new TestModel("d", "e", "f", 10, 
BigInteger.ONE);
 
             Stat stat = new Stat();
             modeled.create(model1, stat);
-            Assert.assertTrue(timing.acquireSemaphore(changesAvailable));
-            Assert.assertTrue(cache.getCurrentData().isPresent());
-            
Assert.assertTrue(cache.getCurrentData().get().getData().isPresent());
-            Assert.assertEquals(cache.getCurrentData().get().getPath(), path);
-            Assert.assertEquals(cache.getCurrentData().get().getData().get(), 
model1);
-            Assert.assertEquals(cache.getCurrentData().get().getStat(), stat);
+            ModeledCacheEvent<TestModel> event = 
events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
+            Assert.assertNotNull(event);
+            Assert.assertEquals(event.getType(), 
ModeledCacheEventType.NODE_UPDATED);
+            Assert.assertTrue(event.getNode().isPresent());
+            Assert.assertTrue(event.getNode().get().getData().isPresent());
+            Assert.assertEquals(event.getNode().get().getPath(), path);
+            Assert.assertEquals(event.getNode().get().getData().get(), model1);
+            Assert.assertEquals(event.getNode().get().getStat(), stat);
 
             timing.sleepABit();
-            Assert.assertEquals(changesAvailable.availablePermits(), 0);
+            Assert.assertEquals(events.size(), 0);
 
             modeled.update(model2);
-            Assert.assertTrue(timing.acquireSemaphore(changesAvailable));
-            Assert.assertTrue(cache.getCurrentData().isPresent());
-            
Assert.assertTrue(cache.getCurrentData().get().getData().isPresent());
-            Assert.assertEquals(cache.getCurrentData().get().getPath(), path);
-            Assert.assertEquals(cache.getCurrentData().get().getData().get(), 
model2);
+            event = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
+            Assert.assertTrue(event.getNode().isPresent());
+            Assert.assertTrue(event.getNode().get().getData().isPresent());
+            Assert.assertEquals(event.getNode().get().getPath(), path);
+            Assert.assertEquals(event.getNode().get().getData().get(), model2);
 
             modeled.delete();
-            Assert.assertTrue(timing.acquireSemaphore(changesAvailable));
-            Assert.assertFalse(cache.getCurrentData().isPresent());
+            event = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
+            Assert.assertFalse(event.getNode().isPresent());
         }
     }
 

Reply via email to