Repository: curator
Updated Branches:
  refs/heads/CURATOR-397 d518417e5 -> 301e98926


finished initial tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/301e9892
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/301e9892
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/301e9892

Branch: refs/heads/CURATOR-397
Commit: 301e989268a81537645bbdc3c71af4166550abd1
Parents: d518417
Author: randgalt <randg...@apache.org>
Authored: Sun Apr 9 06:40:34 2017 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sun Apr 9 06:40:34 2017 -0500

----------------------------------------------------------------------
 .../x/async/modeled/JacksonModelSerializer.java | 19 ++++++
 .../recipes/ModeledPathChildrenCacheImpl.java   |  8 ++-
 .../details/recipes/ModeledTreeCacheImpl.java   |  7 ++-
 .../modeled/recipes/ModeledCacheListener.java   | 57 ++++++++++++++++++
 .../modeled/recipes/TestModeledCaches.java      | 63 ++++++++++++++++++++
 5 files changed, 149 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/301e9892/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/JacksonModelSerializer.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/JacksonModelSerializer.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/JacksonModelSerializer.java
index 429fea7..6d16c86 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/JacksonModelSerializer.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/JacksonModelSerializer.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Objects;
 
 /**
  * Model serializer that uses Jackson for JSON serialization. 
<strong>IMPORTANT: </strong>
@@ -77,6 +78,24 @@ public class JacksonModelSerializer<T> implements 
ModelSerializer<T>
         writer = mapper.writerFor(type);
     }
 
+    public JacksonModelSerializer(ObjectMapper mapper, JavaType type)
+    {
+        reader = mapper.readerFor(type);
+        writer = mapper.writerFor(type);
+    }
+
+    public JacksonModelSerializer(ObjectMapper mapper, TypeReference type)
+    {
+        reader = mapper.readerFor(type);
+        writer = mapper.writerFor(type);
+    }
+
+    public JacksonModelSerializer(ObjectReader reader, ObjectWriter writer)
+    {
+        this.reader = Objects.requireNonNull(reader, "reader cannot be null");
+        this.writer = Objects.requireNonNull(writer, "writer cannot be null");
+    }
+
     @Override
     public byte[] serialize(T model)
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/301e9892/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java
index 0153806..a6777a6 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java
@@ -126,18 +126,20 @@ public class ModeledPathChildrenCacheImpl<T> implements 
ModeledPathChildrenCache
             public void addListener(ModeledCacheListener<T> listener, Executor 
executor)
             {
                 PathChildrenCacheListener pathChildrenCacheListener = (client, 
event) -> {
+                    ModeledCacheEventType eventType = toType(event.getType());
+                    Optional<ModeledCachedNode<T>> node = 
Optional.ofNullable(from(serializer, event.getData()));
                     ModeledCacheEvent<T> modeledEvent = new 
ModeledCacheEvent<T>()
                     {
                         @Override
                         public ModeledCacheEventType getType()
                         {
-                            return toType(event.getType());
+                            return eventType;
                         }
 
                         @Override
                         public Optional<ModeledCachedNode<T>> getNode()
                         {
-                            return Optional.ofNullable(from(serializer, 
event.getData()));
+                            return node;
                         }
                     };
                     listener.event(modeledEvent);
@@ -215,7 +217,7 @@ public class ModeledPathChildrenCacheImpl<T> implements 
ModeledPathChildrenCache
         {
             return null;
         }
-        T model = (data.getData() != null) ? 
serializer.deserialize(data.getData()) : null;
+        T model = ((data.getData() != null) && (data.getData().length > 0)) ? 
serializer.deserialize(data.getData()) : null;
         return new ModeledCachedNodeImpl<>(ZPath.parse(data.getPath()), model, 
data.getStat());
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/301e9892/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java
index 8cf156b..d2b2dab 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java
@@ -96,20 +96,23 @@ public class ModeledTreeCacheImpl<T> implements 
ModeledTreeCache<T>
             public void addListener(ModeledCacheListener<T> listener, Executor 
executor)
             {
                 TreeCacheListener treeCacheListener = (client, event) -> {
+                    ModeledCacheEventType eventType = toType(event.getType());
+                    Optional<ModeledCachedNode<T>> node = 
Optional.ofNullable(from(serializer, event.getData()));
                     ModeledCacheEvent<T> wrappedEvent = new 
ModeledCacheEvent<T>()
                     {
                         @Override
                         public ModeledCacheEventType getType()
                         {
-                            return toType(event.getType());
+                            return eventType;
                         }
 
                         @Override
                         public Optional<ModeledCachedNode<T>> getNode()
                         {
-                            return Optional.ofNullable(from(serializer, 
event.getData()));
+                            return node;
                         }
                     };
+                    listener.event(wrappedEvent);
                 };
                 listenerMap.put(listener, treeCacheListener);
                 cache.getListenable().addListener(treeCacheListener, executor);

http://git-wip-us.apache.org/repos/asf/curator/blob/301e9892/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCacheListener.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCacheListener.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCacheListener.java
index 71a035f..8d6d2cc 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCacheListener.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCacheListener.java
@@ -18,10 +18,67 @@
  */
 package org.apache.curator.x.async.modeled.recipes;
 
+import java.util.function.Predicate;
+
 /**
  * Event listener
  */
+@FunctionalInterface
 public interface ModeledCacheListener<T>
 {
+    /**
+     * Receive an event
+     *
+     * @param event the event
+     */
     void event(ModeledCacheEvent<T> event);
+
+    /**
+     * Wrap this listener with a filter
+     *
+     * @param filter test for events. Only events that pass the filter are 
sent to the listener
+     * @return filtered version of this listener
+     */
+    static <T> ModeledCacheListener<T> filtered(ModeledCacheListener<T> 
listener, Predicate<ModeledCacheEvent<T>> filter)
+    {
+        return event -> {
+            if ( filter.test(event) )
+            {
+                listener.event(event);
+            }
+        };
+    }
+
+    /**
+     * Filters out all but CRUD events
+     *
+     * @return predicate
+     */
+    static <T> Predicate<ModeledCacheEvent<T>> nodeEventFilter()
+    {
+        return event -> (event.getType() == ModeledCacheEventType.NODE_ADDED)
+            || (event.getType() == ModeledCacheEventType.NODE_UPDATED)
+            || (event.getType() == ModeledCacheEventType.NODE_REMOVED)
+            ;
+    }
+
+    /**
+     * Filters out all but {@link ModeledCacheEventType#NODE_REMOVED} events
+     *
+     * @return predicate
+     */
+    static <T> Predicate<ModeledCacheEvent<T>> nodeRemovedFilter()
+    {
+        return event -> event.getType() == ModeledCacheEventType.NODE_REMOVED;
+    }
+
+    /**
+     * Filters out all but events that have valid model instances
+     *
+     * @return predicate
+     */
+    static <T> Predicate<ModeledCacheEvent<T>> hasModelFilter()
+    {
+        return event -> event.getNode().isPresent() && 
event.getNode().get().getData().isPresent();
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/301e9892/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java
index 65b4e37..bacd516 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.async.CompletableBaseClassForTests;
@@ -130,6 +131,7 @@ public class TestModeledCaches extends 
CompletableBaseClassForTests
             Assert.assertNotNull(event1);
             Assert.assertNotNull(event2);
             Assert.assertEquals(event1.getType(), 
ModeledCacheEventType.NODE_ADDED);
+            Assert.assertEquals(event2.getType(), 
ModeledCacheEventType.NODE_ADDED);
             Assert.assertEquals(event1.getNode().isPresent() ? 
event1.getNode().get().getData().orElse(null) : null, model1);
             Assert.assertEquals(event2.getNode().isPresent() ? 
event2.getNode().get().getData().orElse(null) : null, model2);
             Assert.assertEquals(event1.getNode().get().getPath(), 
path.at("1"));
@@ -153,4 +155,65 @@ public class TestModeledCaches extends 
CompletableBaseClassForTests
             
Assert.assertNull(events.poll(timing.forSleepingABit().milliseconds(), 
TimeUnit.MILLISECONDS));  // listener is removed - shouldn't get an event
         }
     }
+
+    @Test
+    public void testModeledPathChildrenCacheWithoutData() throws 
InterruptedException
+    {
+        try ( ModeledPathChildrenCache<TestModel> cache = 
ModeledPathChildrenCache.wrap(new PathChildrenCache(client, path.fullPath(), 
false), serializer) )
+        {
+            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+
+            BlockingQueue<ModeledCacheEvent<TestModel>> events = new 
LinkedBlockingQueue<>();
+            ModeledCacheListener<TestModel> listener = events::add;
+            cache.getListenable().addListener(listener);
+
+            TestModel model1 = new TestModel("a", "b", "c", 1, BigInteger.TEN);
+            TestModel model2 = new TestModel("d", "e", "f", 10, 
BigInteger.ONE);
+
+            modeled.at("1").create(model1).thenApply(__ -> 
modeled.at("2").create(model2));
+            ModeledCacheEvent<TestModel> event1 = 
events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
+            ModeledCacheEvent<TestModel> event2 = 
events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
+            Assert.assertNotNull(event1);
+            Assert.assertNotNull(event2);
+            Assert.assertEquals(event1.getType(), 
ModeledCacheEventType.NODE_ADDED);
+            Assert.assertTrue(event1.getNode().isPresent());
+            Assert.assertTrue(event2.getNode().isPresent());
+            Assert.assertFalse(event1.getNode().get().getData().isPresent());
+            Assert.assertFalse(event2.getNode().get().getData().isPresent());
+        }
+    }
+
+    @Test
+    public void testModeledTreeCacheWithData() throws Exception
+    {
+        try (ModeledTreeCache<TestModel> cache = 
ModeledTreeCache.wrap(TreeCache.newBuilder(client, 
path.fullPath()).build(),serializer) )
+        {
+            BlockingQueue<ModeledCacheEvent<TestModel>> events = new 
LinkedBlockingQueue<>();
+            ModeledCacheListener<TestModel> listener = 
ModeledCacheListener.filtered(events::add, 
ModeledCacheListener.<TestModel>nodeRemovedFilter().or(ModeledCacheListener.hasModelFilter()));
+            cache.getListenable().addListener(listener);
+
+            cache.start();
+
+            TestModel model1 = new TestModel("a", "b", "c", 1, BigInteger.TEN);
+            TestModel model2 = new TestModel("d", "e", "f", 10, 
BigInteger.ONE);
+            TestModel model3 = new TestModel("g", "h", "i", 100, 
BigInteger.ZERO);
+
+            modeled.at("1").create(model1).thenApply(__ -> 
modeled.at("1").at("2").create(model2).thenApply(___ -> 
modeled.at("1").at("2").at("3").create(model3)));
+            ModeledCacheEvent<TestModel> event1 = 
events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
+            ModeledCacheEvent<TestModel> event2 = 
events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
+            ModeledCacheEvent<TestModel> event3 = 
events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
+            Assert.assertNotNull(event1);
+            Assert.assertNotNull(event2);
+            Assert.assertNotNull(event3);
+            Assert.assertEquals(event1.getType(), 
ModeledCacheEventType.NODE_ADDED);
+            Assert.assertEquals(event2.getType(), 
ModeledCacheEventType.NODE_ADDED);
+            Assert.assertEquals(event3.getType(), 
ModeledCacheEventType.NODE_ADDED);
+            Assert.assertEquals(event1.getNode().isPresent() ? 
event1.getNode().get().getData().orElse(null) : null, model1);
+            Assert.assertEquals(event2.getNode().isPresent() ? 
event2.getNode().get().getData().orElse(null) : null, model2);
+            Assert.assertEquals(event3.getNode().isPresent() ? 
event3.getNode().get().getData().orElse(null) : null, model3);
+            Assert.assertEquals(event1.getNode().get().getPath(), 
path.at("1"));
+            Assert.assertEquals(event2.getNode().get().getPath(), 
path.at("1").at("2"));
+            Assert.assertEquals(event3.getNode().get().getPath(), 
path.at("1").at("2").at("3"));
+        }
+    }
 }

Reply via email to