Repository: curator Updated Branches: refs/heads/CURATOR-397 6188fe6ce -> 63225ba75
basics of write-through caching added. needs more work and testing Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/63225ba7 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/63225ba7 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/63225ba7 Branch: refs/heads/CURATOR-397 Commit: 63225ba7562c48d9ac53bfaa50aa8ff0a790eb9c Parents: 6188fe6 Author: randgalt <randg...@apache.org> Authored: Tue Apr 25 12:33:30 2017 -0500 Committer: randgalt <randg...@apache.org> Committed: Tue Apr 25 12:33:30 2017 -0500 ---------------------------------------------------------------------- curator-x-async/pom.xml | 6 + .../async/modeled/ModeledCuratorFramework.java | 10 + .../modeled/ModeledCuratorFrameworkBuilder.java | 35 +++- .../x/async/modeled/caching/Caching.java | 26 +++ .../x/async/modeled/caching/CachingOption.java | 10 + .../x/async/modeled/details/CachingImpl.java | 157 ++++++++++++++ .../x/async/modeled/details/ModelStage.java | 8 +- .../details/ModeledCuratorFrameworkImpl.java | 203 ++++++++++++++++--- .../x/async/modeled/details/ZPathImpl.java | 22 +- .../TestCachedModeledCuratorFramework.java | 96 +++++++++ .../x/async/modeled/models/TestSimpleModel.java | 66 ++++++ .../src/test/resources/log4j.properties | 27 +++ 12 files changed, 627 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/pom.xml ---------------------------------------------------------------------- diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml index 4d645db..925896b 100644 --- a/curator-x-async/pom.xml +++ b/curator-x-async/pom.xml @@ -42,6 +42,12 @@ <artifactId>testng</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java index 00335d4..1accb72 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java @@ -23,6 +23,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.api.DeleteOption; +import org.apache.curator.x.async.modeled.caching.Caching; import org.apache.zookeeper.data.Stat; import java.util.List; import java.util.Set; @@ -72,6 +73,15 @@ public interface ModeledCuratorFramework<T> CuratorFramework unwrap(); /** + * Return the caching APIs. Only valid if {@link ModeledCuratorFrameworkBuilder#cached()} or + * {@link ModeledCuratorFrameworkBuilder#cached(java.util.Set)} was called when building the instance. + * + * @return caching APIs + * @throws java.lang.IllegalStateException if caching was not enabled when building the instance + */ + Caching<T> caching(); + + /** * Return a new Modeled Curator instance with all the same options but applying to the given child node of this Modeled Curator's * path. E.g. if this Modeled Curator instance applies to "/a/b", calling <code>modeled.at("c")</code> returns an instance that applies to * "/a/b/c". http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFrameworkBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFrameworkBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFrameworkBuilder.java index ec15d99..46f9cc3 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFrameworkBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFrameworkBuilder.java @@ -25,10 +25,12 @@ import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.api.DeleteOption; +import org.apache.curator.x.async.modeled.caching.CachingOption; import org.apache.curator.x.async.modeled.details.ModeledCuratorFrameworkImpl; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.data.ACL; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; @@ -43,10 +45,12 @@ public class ModeledCuratorFrameworkBuilder<T> private UnaryOperator<WatchedEvent> watcherFilter; private UnhandledErrorListener unhandledErrorListener; private UnaryOperator<CuratorEvent> resultFilter; - private CreateMode createMode; - private List<ACL> aclList; - private Set<CreateOption> createOptions; - private Set<DeleteOption> deleteOptions; + private CreateMode createMode = CreateMode.PERSISTENT; + private List<ACL> aclList = Collections.emptyList(); + private Set<CreateOption> createOptions = Collections.emptySet(); + private Set<DeleteOption> deleteOptions = Collections.emptySet(); + private Set<CachingOption> cachingOptions = Collections.emptySet(); + private boolean cached = false; /** * Build a new ModeledCuratorFramework instance @@ -55,9 +59,10 @@ public class ModeledCuratorFrameworkBuilder<T> */ public ModeledCuratorFramework<T> build() { - return new ModeledCuratorFrameworkImpl<>( + String fullPath = this.path.fullPath(); + return ModeledCuratorFrameworkImpl.build( client, - path.fullPath(), + fullPath, serializer, watchMode, watcherFilter, @@ -66,7 +71,9 @@ public class ModeledCuratorFrameworkBuilder<T> createMode, aclList, createOptions, - deleteOptions + deleteOptions, + cachingOptions, + cached ); } @@ -183,6 +190,20 @@ public class ModeledCuratorFrameworkBuilder<T> return this; } + public ModeledCuratorFrameworkBuilder<T> cached() + { + this.cachingOptions = Collections.emptySet(); + this.cached = true; + return this; + } + + public ModeledCuratorFrameworkBuilder<T> cached(Set<CachingOption> cachingOptions) + { + this.cachingOptions = Objects.requireNonNull(cachingOptions, "cachingOptions cannot be null"); + this.cached = true; + return this; + } + ModeledCuratorFrameworkBuilder(CuratorFramework client, ZPath path, ModelSerializer<T> serializer) { this.client = Objects.requireNonNull(client, "client cannot be null"); http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/Caching.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/Caching.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/Caching.java new file mode 100644 index 0000000..87637ca --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/Caching.java @@ -0,0 +1,26 @@ +package org.apache.curator.x.async.modeled.caching; + +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.x.async.modeled.recipes.ModeledCacheListener; +import java.io.Closeable; + +public interface Caching<T> extends Closeable +{ + /** + * Forwards to the internal cache's start method. Not idempotent. + */ + void start(); + + /** + * Forwards to the internal cache's close method. + */ + @Override + void close(); + + /** + * Return the listener container so that you can add/remove listeners + * + * @return listener container + */ + Listenable<ModeledCacheListener<T>> getListenable(); +} http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/CachingOption.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/CachingOption.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/CachingOption.java new file mode 100644 index 0000000..9b9ec08 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/CachingOption.java @@ -0,0 +1,10 @@ +package org.apache.curator.x.async.modeled.caching; + +public enum CachingOption +{ + metaDataOnly, + + dirtyReads, + + createParentNodes +} http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java new file mode 100644 index 0000000..9730d05 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java @@ -0,0 +1,157 @@ +package org.apache.curator.x.async.modeled.details; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.x.async.api.CreateOption; +import org.apache.curator.x.async.modeled.ModelSerializer; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.modeled.caching.Caching; +import org.apache.curator.x.async.modeled.caching.CachingOption; +import org.apache.curator.x.async.modeled.recipes.ModeledCacheEvent; +import org.apache.curator.x.async.modeled.recipes.ModeledCacheListener; +import org.apache.curator.x.async.modeled.recipes.ModeledCachedNode; +import org.apache.curator.x.async.modeled.recipes.ModeledTreeCache; +import org.apache.zookeeper.data.Stat; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +class CachingImpl<T> implements Caching<T>, ModeledCacheListener<T> +{ + private final ModeledTreeCache<T> treeCache; + private final AtomicLong dirtyZxid = new AtomicLong(-1); + private final ZPath path; + private final boolean dirtyReads; + + CachingImpl(CuratorFramework client, ModelSerializer<T> serializer, ZPath path, Set<CachingOption> cachingOptions, Set<CreateOption> createOptions) + { + this.path = path; + TreeCache.Builder builder = TreeCache.newBuilder(client, path.fullPath()); + builder = builder.setCacheData(!cachingOptions.contains(CachingOption.metaDataOnly)).setCreateParentNodes(cachingOptions.contains(CachingOption.createParentNodes)); + if ( ModeledCuratorFrameworkImpl.isCompressed(createOptions) ) + { + builder = builder.setDataIsCompressed(true); + } + TreeCache cache = builder.build(); + treeCache = ModeledTreeCache.wrap(cache, serializer); + + treeCache.getListenable().addListener(this); + + dirtyReads = cachingOptions.contains(CachingOption.dirtyReads); + } + + CachingImpl<T> at(String child) + { + return new CachingImpl<>(treeCache, path.at(child), dirtyReads); + } + + @Override + public void start() + { + treeCache.start(); + } + + @Override + public void close() + { + treeCache.close(); + } + + @Override + public Listenable<ModeledCacheListener<T>> getListenable() + { + return treeCache.getListenable(); + } + + @Override + public void event(ModeledCacheEvent<T> event) + { + switch ( event.getType() ) + { + case NODE_ADDED: + case NODE_UPDATED: + { + updateDirtyZxid(event.getNode().getStat().getMzxid()); + break; + } + + case NODE_REMOVED: + { + // TODO + break; + } + + case CONNECTION_RECONNECTED: + { + dirtyZxid.set(-1); + break; + } + } + } + + long getCurrentZxid() + { + return treeCache.getCurrentData(path).map(cache -> (cache.getStat() != null) ? cache.getStat().getMzxid() : -1).orElse(-1L); + } + + void markDirty(long zxid) + { + if ( !dirtyReads && (zxid >= 0) ) + { + long currentDirtyZxid = dirtyZxid.get(); + if ( zxid > currentDirtyZxid ) + { + dirtyZxid.compareAndSet(currentDirtyZxid, zxid); + } + } + } + + ModeledTreeCache<T> getCache() + { + return treeCache; + } + + ModeledCachedNode<T> getCacheIf() + { + Optional<ModeledCachedNode<T>> currentData = treeCache.getCurrentData(path); + return currentData.map(this::getDataWithDirtyCheck).orElse(null); + } + + private ModeledCachedNode<T> getDataWithDirtyCheck(ModeledCachedNode<T> data) + { + Stat stat = data.getStat(); + if ( stat != null ) + { + if ( stat.getMzxid() > dirtyZxid.get() ) + { + return data; + } + } + return null; + } + + private void updateDirtyZxid(long newZxid) + { + if ( dirtyReads ) + { + return; + } + + long currentDirtyZxid = dirtyZxid.get(); + if ( (currentDirtyZxid >= 0) && (newZxid > currentDirtyZxid) ) + { + if ( !dirtyZxid.compareAndSet(currentDirtyZxid, -1) ) + { + updateDirtyZxid(newZxid); + } + } + } + + private CachingImpl(ModeledTreeCache<T> treeCache, ZPath path, boolean dirtyReads) + { + this.treeCache = treeCache; + this.path = path; + this.dirtyReads = dirtyReads; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java index 41523cc..c28b133 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java @@ -29,7 +29,7 @@ class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T> ModelStage() { - this(null); + event = null; } ModelStage(CompletionStage<WatchedEvent> event) @@ -37,6 +37,12 @@ class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T> this.event = event; } + ModelStage(T value) + { + event = null; + complete(value); + } + @Override public CompletionStage<WatchedEvent> event() { http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java index 6898b2d..a6fc968 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java @@ -18,11 +18,11 @@ */ package org.apache.curator.x.async.modeled.details; -import com.google.common.collect.ImmutableSet; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; -import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.WatchMode; @@ -35,14 +35,18 @@ import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModelSerializer; import org.apache.curator.x.async.modeled.ModeledCuratorFramework; import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.modeled.caching.Caching; +import org.apache.curator.x.async.modeled.caching.CachingOption; +import org.apache.curator.x.async.modeled.recipes.ModeledCachedNode; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; -import java.util.Collections; +import org.apache.zookeeper.server.DataTree; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -50,8 +54,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T { private final AsyncCuratorFramework client; private final WatchableAsyncCuratorFramework watchableClient; - private final String path; - private final ZPath zPath; + private final ZPath path; private final ModelSerializer<T> serializer; private final WatchMode watchMode; private final UnaryOperator<WatchedEvent> watcherFilter; @@ -62,26 +65,63 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T private final Set<CreateOption> createOptions; private final Set<DeleteOption> deleteOptions; private final AsyncCuratorFrameworkDsl dslClient; + private final CachingImpl<T> caching; - public ModeledCuratorFrameworkImpl(CuratorFramework client, String path, ModelSerializer<T> serializer, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, CreateMode createMode, List<ACL> aclList, Set<CreateOption> createOptions, Set<DeleteOption> deleteOptions) + public static <T> ModeledCuratorFrameworkImpl<T> build(CuratorFramework client, String path, ModelSerializer<T> serializer, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, CreateMode createMode, List<ACL> aclList, Set<CreateOption> createOptions, Set<DeleteOption> deleteOptions, Set<CachingOption> cachingOptions, boolean cached) { boolean localIsWatched = (watchMode != null); - this.client = AsyncCuratorFramework.wrap(client); - this.path = Objects.requireNonNull(path, "path cannot be null"); - this.serializer = Objects.requireNonNull(serializer, "serializer cannot be null"); - this.watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess; + Objects.requireNonNull(client, "client cannot be null"); + Objects.requireNonNull(path, "path cannot be null"); + Objects.requireNonNull(serializer, "serializer cannot be null"); + Objects.requireNonNull(createOptions, "createOptions cannot be null"); + Objects.requireNonNull(createMode, "createMode cannot be null"); + Objects.requireNonNull(aclList, "aclList cannot be null"); + + watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess; + + ZPath zPath = ZPath.parse(path); + + AsyncCuratorFramework asyncClient = AsyncCuratorFramework.wrap(client); + AsyncCuratorFrameworkDsl dslClient = asyncClient.with(watchMode, unhandledErrorListener, resultFilter, watcherFilter); + WatchableAsyncCuratorFramework watchableClient = localIsWatched ? dslClient.watched() : dslClient; + + CachingImpl<T> caching = cached ? new CachingImpl<>(client, serializer, zPath, cachingOptions, createOptions) : null; + + return new ModeledCuratorFrameworkImpl<>( + asyncClient, + dslClient, + watchableClient, + zPath, + serializer, + watchMode, + watcherFilter, + unhandledErrorListener, + resultFilter, + createMode, + aclList, + createOptions, + deleteOptions, + caching + ); + } + + private ModeledCuratorFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ZPath path, ModelSerializer<T> serializer, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, CreateMode createMode, List<ACL> aclList, Set<CreateOption> createOptions, Set<DeleteOption> deleteOptions, CachingImpl<T> caching) + { + this.client = client; + this.dslClient = dslClient; + this.watchableClient = watchableClient; + this.path = path; + this.serializer = serializer; + this.watchMode = watchMode; this.watcherFilter = watcherFilter; this.unhandledErrorListener = unhandledErrorListener; this.resultFilter = resultFilter; - this.createMode = (createMode != null) ? createMode : CreateMode.PERSISTENT; + this.createMode = createMode; this.aclList = aclList; - this.createOptions = (createOptions != null) ? ImmutableSet.copyOf(createOptions) : Collections.emptySet(); - this.deleteOptions = (deleteOptions != null) ? ImmutableSet.copyOf(deleteOptions) : Collections.emptySet(); - - dslClient = this.client.with(this.watchMode, unhandledErrorListener, resultFilter, watcherFilter); - watchableClient = localIsWatched ? dslClient.watched() : dslClient; - zPath = ZPath.parse(path); + this.createOptions = createOptions; + this.deleteOptions = deleteOptions; + this.caching = caching; } @Override @@ -91,6 +131,13 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T } @Override + public Caching<T> caching() + { + Preconditions.checkState(caching != null, "Caching is not enabled for this instance"); + return caching; + } + + @Override public AsyncStage<String> create(T model) { return create(model, null); @@ -99,8 +146,17 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T @Override public AsyncStage<String> create(T model, Stat storingStatIn) { + long dirtyZxid = getDirtyZxid(); byte[] bytes = serializer.serialize(model); - return dslClient.create().withOptions(createOptions, createMode, aclList, storingStatIn).forPath(path, bytes); + AsyncStage<String> asyncStage = dslClient.create().withOptions(createOptions, createMode, fixAclList(aclList), storingStatIn).forPath(path.fullPath(), bytes); + ModelStage<String> modelStage = new ModelStage<>(asyncStage.event()); + markDirtyCompleter(dirtyZxid, asyncStage, modelStage); + return modelStage; + } + + private List<ACL> fixAclList(List<ACL> aclList) + { + return (aclList.size() > 0) ? aclList : null; // workaround for old, bad design. empty list not accepted } @Override @@ -109,9 +165,29 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T return read(null); } + @VisibleForTesting + volatile AtomicInteger debugCachedReadCount = null; + @Override public AsyncStage<T> read(Stat storingStatIn) { + ModeledCachedNode<T> node = getCached(); + if ( node != null ) + { + if ( node.getModel() != null ) + { + if ( storingStatIn != null ) + { + DataTree.copyStat(node.getStat(), storingStatIn); + } + if ( debugCachedReadCount != null ) + { + debugCachedReadCount.incrementAndGet(); + } + return new ModelStage<>(node.getModel()); + } + } + AsyncPathable<AsyncStage<byte[]>> next; if ( isCompressed() ) { @@ -121,7 +197,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T { next = (storingStatIn != null) ? watchableClient.getData().storingStatIn(storingStatIn) : watchableClient.getData(); } - AsyncStage<byte[]> asyncStage = next.forPath(path); + AsyncStage<byte[]> asyncStage = next.forPath(path.fullPath()); ModelStage<T> modelStage = new ModelStage<>(asyncStage.event()); asyncStage.whenComplete((value, e) -> { if ( e != null ) @@ -130,7 +206,14 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T } else { - modelStage.complete(serializer.deserialize(value)); + try + { + modelStage.complete(serializer.deserialize(value)); + } + catch ( Exception deserializeException ) + { + modelStage.completeExceptionally(deserializeException); + } } }); return modelStage; @@ -145,15 +228,29 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T @Override public AsyncStage<Stat> update(T model, int version) { + long dirtyZxid = getDirtyZxid(); byte[] bytes = serializer.serialize(model); AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? dslClient.setData().compressedWithVersion(version) : dslClient.setData(); - return next.forPath(path, bytes); + AsyncStage<Stat> asyncStage = next.forPath(path.fullPath(), bytes); + ModelStage<Stat> modelStage = new ModelStage<>(asyncStage.event()); + markDirtyCompleter(dirtyZxid, asyncStage, modelStage); + return modelStage; } @Override public AsyncStage<Stat> checkExists() { - return watchableClient.checkExists().forPath(path); + ModeledCachedNode<T> node = getCached(); + if ( node != null ) + { + AsyncStage<Stat> result = new ModelStage<>(node.getStat()); + if ( debugCachedReadCount != null ) + { + debugCachedReadCount.incrementAndGet(); + } + return result; + } + return watchableClient.checkExists().forPath(path.fullPath()); } @Override @@ -165,13 +262,17 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T @Override public AsyncStage<Void> delete(int version) { - return dslClient.delete().withVersion(-1).forPath(path); + long dirtyZxid = getDirtyZxid(); + AsyncStage<Void> asyncStage = dslClient.delete().withVersion(-1).forPath(path.fullPath()); + ModelStage<Void> modelStage = new ModelStage<>(asyncStage.event()); + markDirtyCompleter(dirtyZxid, asyncStage, modelStage); + return modelStage; } @Override public AsyncStage<List<ZPath>> getChildren() { - AsyncStage<List<String>> asyncStage = watchableClient.getChildren().forPath(path); + AsyncStage<List<String>> asyncStage = watchableClient.getChildren().forPath(path.fullPath()); ModelStage<List<ZPath>> modelStage = new ModelStage<>(asyncStage.event()); asyncStage.whenComplete((children, e) -> { if ( e != null ) @@ -180,7 +281,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T } else { - modelStage.complete(children.stream().map(zPath::at).collect(Collectors.toList())); + modelStage.complete(children.stream().map(path::at).collect(Collectors.toList())); } }); return modelStage; @@ -189,12 +290,60 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T @Override public ModeledCuratorFramework<T> at(String child) { - String childPath = ZKPaths.makePath(path, child); - return new ModeledCuratorFrameworkImpl<>(client.unwrap(), childPath, serializer, watchMode, watcherFilter, unhandledErrorListener, resultFilter, createMode, aclList, createOptions, deleteOptions); + ZPath childPath = path.at(child); + CachingImpl<T> newCaching = (caching != null) ? caching.at(child) : null; + return new ModeledCuratorFrameworkImpl<>( + client, + dslClient, + watchableClient, + childPath, serializer, + watchMode, + watcherFilter, + unhandledErrorListener, + resultFilter, + createMode, + aclList, + createOptions, + deleteOptions, + newCaching + ); + } + + public static boolean isCompressed(Set<CreateOption> createOptions) + { + return createOptions.contains(CreateOption.compress); + } + + private <U> void markDirtyCompleter(long dirtyZxid, AsyncStage<U> asyncStage, ModelStage<U> modelStage) + { + asyncStage.whenComplete((value, e) -> { + if ( e != null ) + { + modelStage.completeExceptionally(e); + } + else + { + if ( caching != null ) + { + caching.markDirty(dirtyZxid); + } + modelStage.complete(value); + } + }); } private boolean isCompressed() { return createOptions.contains(CreateOption.compress); } + + private ModeledCachedNode<T> getCached() + { + return (caching != null) ? caching.getCacheIf() : null; + } + + private long getDirtyZxid() + { + return (caching != null) ? caching.getCurrentZxid() : -1L; + } } http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java index 35cae3c..75f2fe7 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java @@ -27,10 +27,13 @@ import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; public class ZPathImpl implements ZPath { public static final ZPath root = new ZPathImpl(Collections.singletonList(ZKPaths.PATH_SEPARATOR)); + public static final AtomicReference<String> fullPathCache = new AtomicReference<>(); + public static final AtomicReference<String> parentPathCache = new AtomicReference<>(); private final List<String> nodes; @@ -66,14 +69,14 @@ public class ZPathImpl implements ZPath @Override public String fullPath() { - return buildFullPath(nodes.size()); + return buildFullPath(false); } @Override public String parentPath() { checkRootAccess(); - return buildFullPath(nodes.size() - 1); + return buildFullPath(true); } @Override @@ -133,10 +136,18 @@ public class ZPathImpl implements ZPath } } - private String buildFullPath(int size) + private String buildFullPath(boolean parent) { + AtomicReference<String> cache = parent ? parentPathCache : fullPathCache; + String path = cache.get(); + if ( path != null ) + { + return path; + } + boolean addSeparator = false; StringBuilder str = new StringBuilder(); + int size = parent ? (nodes.size() - 1) : nodes.size(); for ( int i = 0; i < size; ++i ) { if ( i > 1 ) @@ -145,6 +156,9 @@ public class ZPathImpl implements ZPath } str.append(nodes.get(i)); } - return str.toString(); + path = str.toString(); + + cache.compareAndSet(null, path); + return path; } } http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java new file mode 100644 index 0000000..160ec7d --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.details; + +import com.google.common.collect.Sets; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.async.CompletableBaseClassForTests; +import org.apache.curator.x.async.modeled.JacksonModelSerializer; +import org.apache.curator.x.async.modeled.ModelSerializer; +import org.apache.curator.x.async.modeled.ModeledCuratorFramework; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.modeled.caching.CachingOption; +import org.apache.curator.x.async.modeled.models.TestSimpleModel; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestCachedModeledCuratorFramework extends CompletableBaseClassForTests +{ + private static final ZPath path = ZPath.parse("/test/path"); + private CuratorFramework rawClient; + private ModeledCuratorFramework<TestSimpleModel> client; + + @BeforeMethod + @Override + public void setup() throws Exception + { + super.setup(); + + rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + rawClient.start(); + + ModelSerializer<TestSimpleModel> serializer = new JacksonModelSerializer<>(TestSimpleModel.class); + client = ModeledCuratorFramework.builder(rawClient, path, serializer).cached().build(); + } + + @AfterMethod + @Override + public void teardown() throws Exception + { + CloseableUtils.closeQuietly(rawClient); + super.teardown(); + } + + @Test + public void testBasic() throws InterruptedException + { + client.caching().start(); + + AtomicInteger counter = new AtomicInteger(); + ((ModeledCuratorFrameworkImpl)client).debugCachedReadCount = counter; + + complete(client.read()); + Assert.assertEquals(counter.get(), 0); + + complete(client.create(new TestSimpleModel("test", 10))); + Assert.assertEquals(counter.get(), 0); + + timing.sleepABit(); + + complete(client.read()); + Assert.assertEquals(counter.get(), 1); + counter.set(0); + + complete(client.create(new TestSimpleModel("test2", 20))); + Assert.assertEquals(counter.get(), 0); + + timing.sleepABit(); + + complete(client.read(), (model, e) -> Assert.assertEquals(model, new TestSimpleModel("test2", 20))); + Assert.assertEquals(counter.get(), 1); + + client.caching().close(); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/models/TestSimpleModel.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/models/TestSimpleModel.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/models/TestSimpleModel.java new file mode 100644 index 0000000..def50c5 --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/models/TestSimpleModel.java @@ -0,0 +1,66 @@ +package org.apache.curator.x.async.modeled.models; + +import java.util.Objects; + +public class TestSimpleModel +{ + private final String name; + private final int age; + + public TestSimpleModel() + { + this("", 0); + } + + public TestSimpleModel(String name, int age) + { + this.name = Objects.requireNonNull(name, "name cannot be null"); + this.age = Objects.requireNonNull(age, "age cannot be null"); + } + + public String getName() + { + return name; + } + + public int getAge() + { + return age; + } + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + TestSimpleModel that = (TestSimpleModel)o; + + //noinspection SimplifiableIfStatement + if ( age != that.age ) + { + return false; + } + return name.equals(that.name); + } + + @Override + public int hashCode() + { + int result = name.hashCode(); + result = 31 * result + age; + return result; + } + + @Override + public String toString() + { + return "TestSimpleModel{" + "name='" + name + '\'' + ", age=" + age + '}'; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/resources/log4j.properties b/curator-x-async/src/test/resources/log4j.properties new file mode 100644 index 0000000..2a85e0d --- /dev/null +++ b/curator-x-async/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=ERROR, console + +log4j.logger.org.apache.curator=DEBUG, console +log4j.additivity.org.apache.curator=false + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%-5p %c %x %m [%t]%n