Repository: curator Updated Branches: refs/heads/CURATOR-397 d518417e5 -> 301e98926
finished initial tests Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/301e9892 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/301e9892 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/301e9892 Branch: refs/heads/CURATOR-397 Commit: 301e989268a81537645bbdc3c71af4166550abd1 Parents: d518417 Author: randgalt <randg...@apache.org> Authored: Sun Apr 9 06:40:34 2017 -0500 Committer: randgalt <randg...@apache.org> Committed: Sun Apr 9 06:40:34 2017 -0500 ---------------------------------------------------------------------- .../x/async/modeled/JacksonModelSerializer.java | 19 ++++++ .../recipes/ModeledPathChildrenCacheImpl.java | 8 ++- .../details/recipes/ModeledTreeCacheImpl.java | 7 ++- .../modeled/recipes/ModeledCacheListener.java | 57 ++++++++++++++++++ .../modeled/recipes/TestModeledCaches.java | 63 ++++++++++++++++++++ 5 files changed, 149 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/301e9892/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/JacksonModelSerializer.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/JacksonModelSerializer.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/JacksonModelSerializer.java index 429fea7..6d16c86 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/JacksonModelSerializer.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/JacksonModelSerializer.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import java.io.IOException; import java.util.Arrays; +import java.util.Objects; /** * Model serializer that uses Jackson for JSON serialization. <strong>IMPORTANT: </strong> @@ -77,6 +78,24 @@ public class JacksonModelSerializer<T> implements ModelSerializer<T> writer = mapper.writerFor(type); } + public JacksonModelSerializer(ObjectMapper mapper, JavaType type) + { + reader = mapper.readerFor(type); + writer = mapper.writerFor(type); + } + + public JacksonModelSerializer(ObjectMapper mapper, TypeReference type) + { + reader = mapper.readerFor(type); + writer = mapper.writerFor(type); + } + + public JacksonModelSerializer(ObjectReader reader, ObjectWriter writer) + { + this.reader = Objects.requireNonNull(reader, "reader cannot be null"); + this.writer = Objects.requireNonNull(writer, "writer cannot be null"); + } + @Override public byte[] serialize(T model) { http://git-wip-us.apache.org/repos/asf/curator/blob/301e9892/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java index 0153806..a6777a6 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java @@ -126,18 +126,20 @@ public class ModeledPathChildrenCacheImpl<T> implements ModeledPathChildrenCache public void addListener(ModeledCacheListener<T> listener, Executor executor) { PathChildrenCacheListener pathChildrenCacheListener = (client, event) -> { + ModeledCacheEventType eventType = toType(event.getType()); + Optional<ModeledCachedNode<T>> node = Optional.ofNullable(from(serializer, event.getData())); ModeledCacheEvent<T> modeledEvent = new ModeledCacheEvent<T>() { @Override public ModeledCacheEventType getType() { - return toType(event.getType()); + return eventType; } @Override public Optional<ModeledCachedNode<T>> getNode() { - return Optional.ofNullable(from(serializer, event.getData())); + return node; } }; listener.event(modeledEvent); @@ -215,7 +217,7 @@ public class ModeledPathChildrenCacheImpl<T> implements ModeledPathChildrenCache { return null; } - T model = (data.getData() != null) ? serializer.deserialize(data.getData()) : null; + T model = ((data.getData() != null) && (data.getData().length > 0)) ? serializer.deserialize(data.getData()) : null; return new ModeledCachedNodeImpl<>(ZPath.parse(data.getPath()), model, data.getStat()); } http://git-wip-us.apache.org/repos/asf/curator/blob/301e9892/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java index 8cf156b..d2b2dab 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledTreeCacheImpl.java @@ -96,20 +96,23 @@ public class ModeledTreeCacheImpl<T> implements ModeledTreeCache<T> public void addListener(ModeledCacheListener<T> listener, Executor executor) { TreeCacheListener treeCacheListener = (client, event) -> { + ModeledCacheEventType eventType = toType(event.getType()); + Optional<ModeledCachedNode<T>> node = Optional.ofNullable(from(serializer, event.getData())); ModeledCacheEvent<T> wrappedEvent = new ModeledCacheEvent<T>() { @Override public ModeledCacheEventType getType() { - return toType(event.getType()); + return eventType; } @Override public Optional<ModeledCachedNode<T>> getNode() { - return Optional.ofNullable(from(serializer, event.getData())); + return node; } }; + listener.event(wrappedEvent); }; listenerMap.put(listener, treeCacheListener); cache.getListenable().addListener(treeCacheListener, executor); http://git-wip-us.apache.org/repos/asf/curator/blob/301e9892/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCacheListener.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCacheListener.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCacheListener.java index 71a035f..8d6d2cc 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCacheListener.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCacheListener.java @@ -18,10 +18,67 @@ */ package org.apache.curator.x.async.modeled.recipes; +import java.util.function.Predicate; + /** * Event listener */ +@FunctionalInterface public interface ModeledCacheListener<T> { + /** + * Receive an event + * + * @param event the event + */ void event(ModeledCacheEvent<T> event); + + /** + * Wrap this listener with a filter + * + * @param filter test for events. Only events that pass the filter are sent to the listener + * @return filtered version of this listener + */ + static <T> ModeledCacheListener<T> filtered(ModeledCacheListener<T> listener, Predicate<ModeledCacheEvent<T>> filter) + { + return event -> { + if ( filter.test(event) ) + { + listener.event(event); + } + }; + } + + /** + * Filters out all but CRUD events + * + * @return predicate + */ + static <T> Predicate<ModeledCacheEvent<T>> nodeEventFilter() + { + return event -> (event.getType() == ModeledCacheEventType.NODE_ADDED) + || (event.getType() == ModeledCacheEventType.NODE_UPDATED) + || (event.getType() == ModeledCacheEventType.NODE_REMOVED) + ; + } + + /** + * Filters out all but {@link ModeledCacheEventType#NODE_REMOVED} events + * + * @return predicate + */ + static <T> Predicate<ModeledCacheEvent<T>> nodeRemovedFilter() + { + return event -> event.getType() == ModeledCacheEventType.NODE_REMOVED; + } + + /** + * Filters out all but events that have valid model instances + * + * @return predicate + */ + static <T> Predicate<ModeledCacheEvent<T>> hasModelFilter() + { + return event -> event.getNode().isPresent() && event.getNode().get().getData().isPresent(); + } } http://git-wip-us.apache.org/repos/asf/curator/blob/301e9892/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java index 65b4e37..bacd516 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java @@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.async.CompletableBaseClassForTests; @@ -130,6 +131,7 @@ public class TestModeledCaches extends CompletableBaseClassForTests Assert.assertNotNull(event1); Assert.assertNotNull(event2); Assert.assertEquals(event1.getType(), ModeledCacheEventType.NODE_ADDED); + Assert.assertEquals(event2.getType(), ModeledCacheEventType.NODE_ADDED); Assert.assertEquals(event1.getNode().isPresent() ? event1.getNode().get().getData().orElse(null) : null, model1); Assert.assertEquals(event2.getNode().isPresent() ? event2.getNode().get().getData().orElse(null) : null, model2); Assert.assertEquals(event1.getNode().get().getPath(), path.at("1")); @@ -153,4 +155,65 @@ public class TestModeledCaches extends CompletableBaseClassForTests Assert.assertNull(events.poll(timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS)); // listener is removed - shouldn't get an event } } + + @Test + public void testModeledPathChildrenCacheWithoutData() throws InterruptedException + { + try ( ModeledPathChildrenCache<TestModel> cache = ModeledPathChildrenCache.wrap(new PathChildrenCache(client, path.fullPath(), false), serializer) ) + { + cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + + BlockingQueue<ModeledCacheEvent<TestModel>> events = new LinkedBlockingQueue<>(); + ModeledCacheListener<TestModel> listener = events::add; + cache.getListenable().addListener(listener); + + TestModel model1 = new TestModel("a", "b", "c", 1, BigInteger.TEN); + TestModel model2 = new TestModel("d", "e", "f", 10, BigInteger.ONE); + + modeled.at("1").create(model1).thenApply(__ -> modeled.at("2").create(model2)); + ModeledCacheEvent<TestModel> event1 = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); + ModeledCacheEvent<TestModel> event2 = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); + Assert.assertNotNull(event1); + Assert.assertNotNull(event2); + Assert.assertEquals(event1.getType(), ModeledCacheEventType.NODE_ADDED); + Assert.assertTrue(event1.getNode().isPresent()); + Assert.assertTrue(event2.getNode().isPresent()); + Assert.assertFalse(event1.getNode().get().getData().isPresent()); + Assert.assertFalse(event2.getNode().get().getData().isPresent()); + } + } + + @Test + public void testModeledTreeCacheWithData() throws Exception + { + try (ModeledTreeCache<TestModel> cache = ModeledTreeCache.wrap(TreeCache.newBuilder(client, path.fullPath()).build(),serializer) ) + { + BlockingQueue<ModeledCacheEvent<TestModel>> events = new LinkedBlockingQueue<>(); + ModeledCacheListener<TestModel> listener = ModeledCacheListener.filtered(events::add, ModeledCacheListener.<TestModel>nodeRemovedFilter().or(ModeledCacheListener.hasModelFilter())); + cache.getListenable().addListener(listener); + + cache.start(); + + TestModel model1 = new TestModel("a", "b", "c", 1, BigInteger.TEN); + TestModel model2 = new TestModel("d", "e", "f", 10, BigInteger.ONE); + TestModel model3 = new TestModel("g", "h", "i", 100, BigInteger.ZERO); + + modeled.at("1").create(model1).thenApply(__ -> modeled.at("1").at("2").create(model2).thenApply(___ -> modeled.at("1").at("2").at("3").create(model3))); + ModeledCacheEvent<TestModel> event1 = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); + ModeledCacheEvent<TestModel> event2 = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); + ModeledCacheEvent<TestModel> event3 = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); + Assert.assertNotNull(event1); + Assert.assertNotNull(event2); + Assert.assertNotNull(event3); + Assert.assertEquals(event1.getType(), ModeledCacheEventType.NODE_ADDED); + Assert.assertEquals(event2.getType(), ModeledCacheEventType.NODE_ADDED); + Assert.assertEquals(event3.getType(), ModeledCacheEventType.NODE_ADDED); + Assert.assertEquals(event1.getNode().isPresent() ? event1.getNode().get().getData().orElse(null) : null, model1); + Assert.assertEquals(event2.getNode().isPresent() ? event2.getNode().get().getData().orElse(null) : null, model2); + Assert.assertEquals(event3.getNode().isPresent() ? event3.getNode().get().getData().orElse(null) : null, model3); + Assert.assertEquals(event1.getNode().get().getPath(), path.at("1")); + Assert.assertEquals(event2.getNode().get().getPath(), path.at("1").at("2")); + Assert.assertEquals(event3.getNode().get().getPath(), path.at("1").at("2").at("3")); + } + } }