Repository: curator
Updated Branches:
  refs/heads/CURATOR-397 6188fe6ce -> 63225ba75


basics of write-through caching added. needs more work and testing


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/63225ba7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/63225ba7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/63225ba7

Branch: refs/heads/CURATOR-397
Commit: 63225ba7562c48d9ac53bfaa50aa8ff0a790eb9c
Parents: 6188fe6
Author: randgalt <randg...@apache.org>
Authored: Tue Apr 25 12:33:30 2017 -0500
Committer: randgalt <randg...@apache.org>
Committed: Tue Apr 25 12:33:30 2017 -0500

----------------------------------------------------------------------
 curator-x-async/pom.xml                         |   6 +
 .../async/modeled/ModeledCuratorFramework.java  |  10 +
 .../modeled/ModeledCuratorFrameworkBuilder.java |  35 +++-
 .../x/async/modeled/caching/Caching.java        |  26 +++
 .../x/async/modeled/caching/CachingOption.java  |  10 +
 .../x/async/modeled/details/CachingImpl.java    | 157 ++++++++++++++
 .../x/async/modeled/details/ModelStage.java     |   8 +-
 .../details/ModeledCuratorFrameworkImpl.java    | 203 ++++++++++++++++---
 .../x/async/modeled/details/ZPathImpl.java      |  22 +-
 .../TestCachedModeledCuratorFramework.java      |  96 +++++++++
 .../x/async/modeled/models/TestSimpleModel.java |  66 ++++++
 .../src/test/resources/log4j.properties         |  27 +++
 12 files changed, 627 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/pom.xml
----------------------------------------------------------------------
diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml
index 4d645db..925896b 100644
--- a/curator-x-async/pom.xml
+++ b/curator-x-async/pom.xml
@@ -42,6 +42,12 @@
             <artifactId>testng</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java
index 00335d4..1accb72 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.api.DeleteOption;
+import org.apache.curator.x.async.modeled.caching.Caching;
 import org.apache.zookeeper.data.Stat;
 import java.util.List;
 import java.util.Set;
@@ -72,6 +73,15 @@ public interface ModeledCuratorFramework<T>
     CuratorFramework unwrap();
 
     /**
+     * Return the caching APIs. Only valid if {@link 
ModeledCuratorFrameworkBuilder#cached()} or
+     * {@link ModeledCuratorFrameworkBuilder#cached(java.util.Set)} was called 
when building the instance.
+     *
+     * @return caching APIs
+     * @throws java.lang.IllegalStateException if caching was not enabled when 
building the instance
+     */
+    Caching<T> caching();
+
+    /**
      * Return a new Modeled Curator instance with all the same options but 
applying to the given child node of this Modeled Curator's
      * path. E.g. if this Modeled Curator instance applies to "/a/b", calling 
<code>modeled.at("c")</code> returns an instance that applies to
      * "/a/b/c".

http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFrameworkBuilder.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFrameworkBuilder.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFrameworkBuilder.java
index ec15d99..46f9cc3 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFrameworkBuilder.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFrameworkBuilder.java
@@ -25,10 +25,12 @@ import 
org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.api.DeleteOption;
+import org.apache.curator.x.async.modeled.caching.CachingOption;
 import org.apache.curator.x.async.modeled.details.ModeledCuratorFrameworkImpl;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.ACL;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -43,10 +45,12 @@ public class ModeledCuratorFrameworkBuilder<T>
     private UnaryOperator<WatchedEvent> watcherFilter;
     private UnhandledErrorListener unhandledErrorListener;
     private UnaryOperator<CuratorEvent> resultFilter;
-    private CreateMode createMode;
-    private List<ACL> aclList;
-    private Set<CreateOption> createOptions;
-    private Set<DeleteOption> deleteOptions;
+    private CreateMode createMode = CreateMode.PERSISTENT;
+    private List<ACL> aclList = Collections.emptyList();
+    private Set<CreateOption> createOptions = Collections.emptySet();
+    private Set<DeleteOption> deleteOptions = Collections.emptySet();
+    private Set<CachingOption> cachingOptions = Collections.emptySet();
+    private boolean cached = false;
 
     /**
      * Build a new ModeledCuratorFramework instance
@@ -55,9 +59,10 @@ public class ModeledCuratorFrameworkBuilder<T>
      */
     public ModeledCuratorFramework<T> build()
     {
-        return new ModeledCuratorFrameworkImpl<>(
+        String fullPath = this.path.fullPath();
+        return ModeledCuratorFrameworkImpl.build(
             client,
-            path.fullPath(),
+            fullPath,
             serializer,
             watchMode,
             watcherFilter,
@@ -66,7 +71,9 @@ public class ModeledCuratorFrameworkBuilder<T>
             createMode,
             aclList,
             createOptions,
-            deleteOptions
+            deleteOptions,
+            cachingOptions,
+            cached
         );
     }
 
@@ -183,6 +190,20 @@ public class ModeledCuratorFrameworkBuilder<T>
         return this;
     }
 
+    public ModeledCuratorFrameworkBuilder<T> cached()
+    {
+        this.cachingOptions = Collections.emptySet();
+        this.cached = true;
+        return this;
+    }
+
+    public ModeledCuratorFrameworkBuilder<T> cached(Set<CachingOption> 
cachingOptions)
+    {
+        this.cachingOptions = Objects.requireNonNull(cachingOptions, 
"cachingOptions cannot be null");
+        this.cached = true;
+        return this;
+    }
+
     ModeledCuratorFrameworkBuilder(CuratorFramework client, ZPath path, 
ModelSerializer<T> serializer)
     {
         this.client = Objects.requireNonNull(client, "client cannot be null");

http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/Caching.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/Caching.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/Caching.java
new file mode 100644
index 0000000..87637ca
--- /dev/null
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/Caching.java
@@ -0,0 +1,26 @@
+package org.apache.curator.x.async.modeled.caching;
+
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.x.async.modeled.recipes.ModeledCacheListener;
+import java.io.Closeable;
+
+public interface Caching<T> extends Closeable
+{
+    /**
+     * Forwards to the internal cache's start method. Not idempotent.
+     */
+    void start();
+
+    /**
+     * Forwards to the internal cache's close method.
+     */
+    @Override
+    void close();
+
+    /**
+     * Return the listener container so that you can add/remove listeners
+     *
+     * @return listener container
+     */
+    Listenable<ModeledCacheListener<T>> getListenable();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/CachingOption.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/CachingOption.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/CachingOption.java
new file mode 100644
index 0000000..9b9ec08
--- /dev/null
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/caching/CachingOption.java
@@ -0,0 +1,10 @@
+package org.apache.curator.x.async.modeled.caching;
+
+public enum CachingOption
+{
+    metaDataOnly,
+
+    dirtyReads,
+
+    createParentNodes
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java
new file mode 100644
index 0000000..9730d05
--- /dev/null
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java
@@ -0,0 +1,157 @@
+package org.apache.curator.x.async.modeled.details;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.x.async.api.CreateOption;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.caching.Caching;
+import org.apache.curator.x.async.modeled.caching.CachingOption;
+import org.apache.curator.x.async.modeled.recipes.ModeledCacheEvent;
+import org.apache.curator.x.async.modeled.recipes.ModeledCacheListener;
+import org.apache.curator.x.async.modeled.recipes.ModeledCachedNode;
+import org.apache.curator.x.async.modeled.recipes.ModeledTreeCache;
+import org.apache.zookeeper.data.Stat;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+class CachingImpl<T> implements Caching<T>, ModeledCacheListener<T>
+{
+    private final ModeledTreeCache<T> treeCache;
+    private final AtomicLong dirtyZxid = new AtomicLong(-1);
+    private final ZPath path;
+    private final boolean dirtyReads;
+
+    CachingImpl(CuratorFramework client, ModelSerializer<T> serializer, ZPath 
path, Set<CachingOption> cachingOptions, Set<CreateOption> createOptions)
+    {
+        this.path = path;
+        TreeCache.Builder builder = TreeCache.newBuilder(client, 
path.fullPath());
+        builder = 
builder.setCacheData(!cachingOptions.contains(CachingOption.metaDataOnly)).setCreateParentNodes(cachingOptions.contains(CachingOption.createParentNodes));
+        if ( ModeledCuratorFrameworkImpl.isCompressed(createOptions) )
+        {
+            builder = builder.setDataIsCompressed(true);
+        }
+        TreeCache cache = builder.build();
+        treeCache = ModeledTreeCache.wrap(cache, serializer);
+
+        treeCache.getListenable().addListener(this);
+
+        dirtyReads = cachingOptions.contains(CachingOption.dirtyReads);
+    }
+
+    CachingImpl<T> at(String child)
+    {
+        return new CachingImpl<>(treeCache, path.at(child), dirtyReads);
+    }
+
+    @Override
+    public void start()
+    {
+        treeCache.start();
+    }
+
+    @Override
+    public void close()
+    {
+        treeCache.close();
+    }
+
+    @Override
+    public Listenable<ModeledCacheListener<T>> getListenable()
+    {
+        return treeCache.getListenable();
+    }
+
+    @Override
+    public void event(ModeledCacheEvent<T> event)
+    {
+        switch ( event.getType() )
+        {
+            case NODE_ADDED:
+            case NODE_UPDATED:
+            {
+                updateDirtyZxid(event.getNode().getStat().getMzxid());
+                break;
+            }
+
+            case NODE_REMOVED:
+            {
+                // TODO
+                break;
+            }
+
+            case CONNECTION_RECONNECTED:
+            {
+                dirtyZxid.set(-1);
+                break;
+            }
+        }
+    }
+
+    long getCurrentZxid()
+    {
+        return treeCache.getCurrentData(path).map(cache -> (cache.getStat() != 
null) ? cache.getStat().getMzxid() : -1).orElse(-1L);
+    }
+
+    void markDirty(long zxid)
+    {
+        if ( !dirtyReads && (zxid >= 0) )
+        {
+            long currentDirtyZxid = dirtyZxid.get();
+            if ( zxid > currentDirtyZxid )
+            {
+                dirtyZxid.compareAndSet(currentDirtyZxid, zxid);
+            }
+        }
+    }
+
+    ModeledTreeCache<T> getCache()
+    {
+        return treeCache;
+    }
+
+    ModeledCachedNode<T> getCacheIf()
+    {
+        Optional<ModeledCachedNode<T>> currentData = 
treeCache.getCurrentData(path);
+        return currentData.map(this::getDataWithDirtyCheck).orElse(null);
+    }
+
+    private ModeledCachedNode<T> getDataWithDirtyCheck(ModeledCachedNode<T> 
data)
+    {
+        Stat stat = data.getStat();
+        if ( stat != null )
+        {
+            if ( stat.getMzxid() > dirtyZxid.get() )
+            {
+                return data;
+            }
+        }
+        return null;
+    }
+
+    private void updateDirtyZxid(long newZxid)
+    {
+        if ( dirtyReads )
+        {
+            return;
+        }
+
+        long currentDirtyZxid = dirtyZxid.get();
+        if ( (currentDirtyZxid >= 0) && (newZxid > currentDirtyZxid) )
+        {
+            if ( !dirtyZxid.compareAndSet(currentDirtyZxid, -1) )
+            {
+                updateDirtyZxid(newZxid);
+            }
+        }
+    }
+
+    private CachingImpl(ModeledTreeCache<T> treeCache, ZPath path, boolean 
dirtyReads)
+    {
+        this.treeCache = treeCache;
+        this.path = path;
+        this.dirtyReads = dirtyReads;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
index 41523cc..c28b133 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
@@ -29,7 +29,7 @@ class ModelStage<T> extends CompletableFuture<T> implements 
AsyncStage<T>
 
     ModelStage()
     {
-        this(null);
+        event = null;
     }
 
     ModelStage(CompletionStage<WatchedEvent> event)
@@ -37,6 +37,12 @@ class ModelStage<T> extends CompletableFuture<T> implements 
AsyncStage<T>
         this.event = event;
     }
 
+    ModelStage(T value)
+    {
+        event = null;
+        complete(value);
+    }
+
     @Override
     public CompletionStage<WatchedEvent> event()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java
index 6898b2d..a6fc968 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java
@@ -18,11 +18,11 @@
  */
 package org.apache.curator.x.async.modeled.details;
 
-import com.google.common.collect.ImmutableSet;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.AsyncCuratorFramework;
 import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.WatchMode;
@@ -35,14 +35,18 @@ import 
org.apache.curator.x.async.api.WatchableAsyncCuratorFramework;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModeledCuratorFramework;
 import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.caching.Caching;
+import org.apache.curator.x.async.modeled.caching.CachingOption;
+import org.apache.curator.x.async.modeled.recipes.ModeledCachedNode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
-import java.util.Collections;
+import org.apache.zookeeper.server.DataTree;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
@@ -50,8 +54,7 @@ public class ModeledCuratorFrameworkImpl<T> implements 
ModeledCuratorFramework<T
 {
     private final AsyncCuratorFramework client;
     private final WatchableAsyncCuratorFramework watchableClient;
-    private final String path;
-    private final ZPath zPath;
+    private final ZPath path;
     private final ModelSerializer<T> serializer;
     private final WatchMode watchMode;
     private final UnaryOperator<WatchedEvent> watcherFilter;
@@ -62,26 +65,63 @@ public class ModeledCuratorFrameworkImpl<T> implements 
ModeledCuratorFramework<T
     private final Set<CreateOption> createOptions;
     private final Set<DeleteOption> deleteOptions;
     private final AsyncCuratorFrameworkDsl dslClient;
+    private final CachingImpl<T> caching;
 
-    public ModeledCuratorFrameworkImpl(CuratorFramework client, String path, 
ModelSerializer<T> serializer, WatchMode watchMode, UnaryOperator<WatchedEvent> 
watcherFilter, UnhandledErrorListener unhandledErrorListener, 
UnaryOperator<CuratorEvent> resultFilter, CreateMode createMode, List<ACL> 
aclList, Set<CreateOption> createOptions, Set<DeleteOption> deleteOptions)
+    public static <T> ModeledCuratorFrameworkImpl<T> build(CuratorFramework 
client, String path, ModelSerializer<T> serializer, WatchMode watchMode, 
UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener 
unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, CreateMode 
createMode, List<ACL> aclList, Set<CreateOption> createOptions, 
Set<DeleteOption> deleteOptions, Set<CachingOption> cachingOptions, boolean 
cached)
     {
         boolean localIsWatched = (watchMode != null);
 
-        this.client = AsyncCuratorFramework.wrap(client);
-        this.path = Objects.requireNonNull(path, "path cannot be null");
-        this.serializer = Objects.requireNonNull(serializer, "serializer 
cannot be null");
-        this.watchMode = (watchMode != null) ? watchMode : 
WatchMode.stateChangeAndSuccess;
+        Objects.requireNonNull(client, "client cannot be null");
+        Objects.requireNonNull(path, "path cannot be null");
+        Objects.requireNonNull(serializer, "serializer cannot be null");
+        Objects.requireNonNull(createOptions, "createOptions cannot be null");
+        Objects.requireNonNull(createMode, "createMode cannot be null");
+        Objects.requireNonNull(aclList, "aclList cannot be null");
+
+        watchMode = (watchMode != null) ? watchMode : 
WatchMode.stateChangeAndSuccess;
+
+        ZPath zPath = ZPath.parse(path);
+
+        AsyncCuratorFramework asyncClient = AsyncCuratorFramework.wrap(client);
+        AsyncCuratorFrameworkDsl dslClient = asyncClient.with(watchMode, 
unhandledErrorListener, resultFilter, watcherFilter);
+        WatchableAsyncCuratorFramework watchableClient = localIsWatched ? 
dslClient.watched() : dslClient;
+
+        CachingImpl<T> caching = cached ? new CachingImpl<>(client, 
serializer, zPath, cachingOptions, createOptions) : null;
+
+        return new ModeledCuratorFrameworkImpl<>(
+            asyncClient,
+            dslClient,
+            watchableClient,
+            zPath,
+            serializer,
+            watchMode,
+            watcherFilter,
+            unhandledErrorListener,
+            resultFilter,
+            createMode,
+            aclList,
+            createOptions,
+            deleteOptions,
+            caching
+        );
+    }
+
+    private ModeledCuratorFrameworkImpl(AsyncCuratorFramework client, 
AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework 
watchableClient, ZPath path, ModelSerializer<T> serializer, WatchMode 
watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener 
unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, CreateMode 
createMode, List<ACL> aclList, Set<CreateOption> createOptions, 
Set<DeleteOption> deleteOptions, CachingImpl<T> caching)
+    {
+        this.client = client;
+        this.dslClient = dslClient;
+        this.watchableClient = watchableClient;
+        this.path = path;
+        this.serializer = serializer;
+        this.watchMode = watchMode;
         this.watcherFilter = watcherFilter;
         this.unhandledErrorListener = unhandledErrorListener;
         this.resultFilter = resultFilter;
-        this.createMode = (createMode != null) ? createMode : 
CreateMode.PERSISTENT;
+        this.createMode = createMode;
         this.aclList = aclList;
-        this.createOptions = (createOptions != null) ? 
ImmutableSet.copyOf(createOptions) : Collections.emptySet();
-        this.deleteOptions = (deleteOptions != null) ? 
ImmutableSet.copyOf(deleteOptions) : Collections.emptySet();
-
-        dslClient = this.client.with(this.watchMode, unhandledErrorListener, 
resultFilter, watcherFilter);
-        watchableClient = localIsWatched ? dslClient.watched() : dslClient;
-        zPath = ZPath.parse(path);
+        this.createOptions = createOptions;
+        this.deleteOptions = deleteOptions;
+        this.caching = caching;
     }
 
     @Override
@@ -91,6 +131,13 @@ public class ModeledCuratorFrameworkImpl<T> implements 
ModeledCuratorFramework<T
     }
 
     @Override
+    public Caching<T> caching()
+    {
+        Preconditions.checkState(caching != null, "Caching is not enabled for 
this instance");
+        return caching;
+    }
+
+    @Override
     public AsyncStage<String> create(T model)
     {
         return create(model, null);
@@ -99,8 +146,17 @@ public class ModeledCuratorFrameworkImpl<T> implements 
ModeledCuratorFramework<T
     @Override
     public AsyncStage<String> create(T model, Stat storingStatIn)
     {
+        long dirtyZxid = getDirtyZxid();
         byte[] bytes = serializer.serialize(model);
-        return dslClient.create().withOptions(createOptions, createMode, 
aclList, storingStatIn).forPath(path, bytes);
+        AsyncStage<String> asyncStage = 
dslClient.create().withOptions(createOptions, createMode, fixAclList(aclList), 
storingStatIn).forPath(path.fullPath(), bytes);
+        ModelStage<String> modelStage = new ModelStage<>(asyncStage.event());
+        markDirtyCompleter(dirtyZxid, asyncStage, modelStage);
+        return modelStage;
+    }
+
+    private List<ACL> fixAclList(List<ACL> aclList)
+    {
+        return (aclList.size() > 0) ? aclList : null;   // workaround for old, 
bad design. empty list not accepted
     }
 
     @Override
@@ -109,9 +165,29 @@ public class ModeledCuratorFrameworkImpl<T> implements 
ModeledCuratorFramework<T
         return read(null);
     }
 
+    @VisibleForTesting
+    volatile AtomicInteger debugCachedReadCount = null;
+
     @Override
     public AsyncStage<T> read(Stat storingStatIn)
     {
+        ModeledCachedNode<T> node = getCached();
+        if ( node != null )
+        {
+            if ( node.getModel() != null )
+            {
+                if ( storingStatIn != null )
+                {
+                    DataTree.copyStat(node.getStat(), storingStatIn);
+                }
+                if ( debugCachedReadCount != null )
+                {
+                    debugCachedReadCount.incrementAndGet();
+                }
+                return new ModelStage<>(node.getModel());
+            }
+        }
+
         AsyncPathable<AsyncStage<byte[]>> next;
         if ( isCompressed() )
         {
@@ -121,7 +197,7 @@ public class ModeledCuratorFrameworkImpl<T> implements 
ModeledCuratorFramework<T
         {
             next = (storingStatIn != null) ? 
watchableClient.getData().storingStatIn(storingStatIn) : 
watchableClient.getData();
         }
-        AsyncStage<byte[]> asyncStage = next.forPath(path);
+        AsyncStage<byte[]> asyncStage = next.forPath(path.fullPath());
         ModelStage<T> modelStage = new ModelStage<>(asyncStage.event());
         asyncStage.whenComplete((value, e) -> {
             if ( e != null )
@@ -130,7 +206,14 @@ public class ModeledCuratorFrameworkImpl<T> implements 
ModeledCuratorFramework<T
             }
             else
             {
-                modelStage.complete(serializer.deserialize(value));
+                try
+                {
+                    modelStage.complete(serializer.deserialize(value));
+                }
+                catch ( Exception deserializeException )
+                {
+                    modelStage.completeExceptionally(deserializeException);
+                }
             }
         });
         return modelStage;
@@ -145,15 +228,29 @@ public class ModeledCuratorFrameworkImpl<T> implements 
ModeledCuratorFramework<T
     @Override
     public AsyncStage<Stat> update(T model, int version)
     {
+        long dirtyZxid = getDirtyZxid();
         byte[] bytes = serializer.serialize(model);
         AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? 
dslClient.setData().compressedWithVersion(version) : dslClient.setData();
-        return next.forPath(path, bytes);
+        AsyncStage<Stat> asyncStage = next.forPath(path.fullPath(), bytes);
+        ModelStage<Stat> modelStage = new ModelStage<>(asyncStage.event());
+        markDirtyCompleter(dirtyZxid, asyncStage, modelStage);
+        return modelStage;
     }
 
     @Override
     public AsyncStage<Stat> checkExists()
     {
-        return watchableClient.checkExists().forPath(path);
+        ModeledCachedNode<T> node = getCached();
+        if ( node != null )
+        {
+            AsyncStage<Stat> result = new ModelStage<>(node.getStat());
+            if ( debugCachedReadCount != null )
+            {
+                debugCachedReadCount.incrementAndGet();
+            }
+            return result;
+        }
+        return watchableClient.checkExists().forPath(path.fullPath());
     }
 
     @Override
@@ -165,13 +262,17 @@ public class ModeledCuratorFrameworkImpl<T> implements 
ModeledCuratorFramework<T
     @Override
     public AsyncStage<Void> delete(int version)
     {
-        return dslClient.delete().withVersion(-1).forPath(path);
+        long dirtyZxid = getDirtyZxid();
+        AsyncStage<Void> asyncStage = 
dslClient.delete().withVersion(-1).forPath(path.fullPath());
+        ModelStage<Void> modelStage = new ModelStage<>(asyncStage.event());
+        markDirtyCompleter(dirtyZxid, asyncStage, modelStage);
+        return modelStage;
     }
 
     @Override
     public AsyncStage<List<ZPath>> getChildren()
     {
-        AsyncStage<List<String>> asyncStage = 
watchableClient.getChildren().forPath(path);
+        AsyncStage<List<String>> asyncStage = 
watchableClient.getChildren().forPath(path.fullPath());
         ModelStage<List<ZPath>> modelStage = new 
ModelStage<>(asyncStage.event());
         asyncStage.whenComplete((children, e) -> {
             if ( e != null )
@@ -180,7 +281,7 @@ public class ModeledCuratorFrameworkImpl<T> implements 
ModeledCuratorFramework<T
             }
             else
             {
-                
modelStage.complete(children.stream().map(zPath::at).collect(Collectors.toList()));
+                
modelStage.complete(children.stream().map(path::at).collect(Collectors.toList()));
             }
         });
         return modelStage;
@@ -189,12 +290,60 @@ public class ModeledCuratorFrameworkImpl<T> implements 
ModeledCuratorFramework<T
     @Override
     public ModeledCuratorFramework<T> at(String child)
     {
-        String childPath = ZKPaths.makePath(path, child);
-        return new ModeledCuratorFrameworkImpl<>(client.unwrap(), childPath, 
serializer, watchMode, watcherFilter, unhandledErrorListener, resultFilter, 
createMode, aclList, createOptions, deleteOptions);
+        ZPath childPath = path.at(child);
+        CachingImpl<T> newCaching = (caching != null) ? caching.at(child) : 
null;
+        return new ModeledCuratorFrameworkImpl<>(
+            client,
+            dslClient,
+            watchableClient,
+            childPath, serializer,
+            watchMode,
+            watcherFilter,
+            unhandledErrorListener,
+            resultFilter,
+            createMode,
+            aclList,
+            createOptions,
+            deleteOptions,
+            newCaching
+        );
+    }
+
+    public static boolean isCompressed(Set<CreateOption> createOptions)
+    {
+        return createOptions.contains(CreateOption.compress);
+    }
+
+    private <U> void markDirtyCompleter(long dirtyZxid, AsyncStage<U> 
asyncStage, ModelStage<U> modelStage)
+    {
+        asyncStage.whenComplete((value, e) -> {
+            if ( e != null )
+            {
+                modelStage.completeExceptionally(e);
+            }
+            else
+            {
+                if ( caching != null )
+                {
+                    caching.markDirty(dirtyZxid);
+                }
+                modelStage.complete(value);
+            }
+        });
     }
 
     private boolean isCompressed()
     {
         return createOptions.contains(CreateOption.compress);
     }
+
+    private ModeledCachedNode<T> getCached()
+    {
+        return (caching != null) ? caching.getCacheIf() : null;
+    }
+
+    private long getDirtyZxid()
+    {
+        return (caching != null) ? caching.getCurrentZxid() : -1L;
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java
index 35cae3c..75f2fe7 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java
@@ -27,10 +27,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class ZPathImpl implements ZPath
 {
     public static final ZPath root = new 
ZPathImpl(Collections.singletonList(ZKPaths.PATH_SEPARATOR));
+    public static final AtomicReference<String> fullPathCache = new 
AtomicReference<>();
+    public static final AtomicReference<String> parentPathCache = new 
AtomicReference<>();
 
     private final List<String> nodes;
 
@@ -66,14 +69,14 @@ public class ZPathImpl implements ZPath
     @Override
     public String fullPath()
     {
-        return buildFullPath(nodes.size());
+        return buildFullPath(false);
     }
 
     @Override
     public String parentPath()
     {
         checkRootAccess();
-        return buildFullPath(nodes.size() - 1);
+        return buildFullPath(true);
     }
 
     @Override
@@ -133,10 +136,18 @@ public class ZPathImpl implements ZPath
         }
     }
 
-    private String buildFullPath(int size)
+    private String buildFullPath(boolean parent)
     {
+        AtomicReference<String> cache = parent ? parentPathCache : 
fullPathCache;
+        String path = cache.get();
+        if ( path != null )
+        {
+            return path;
+        }
+
         boolean addSeparator = false;
         StringBuilder str = new StringBuilder();
+        int size = parent ? (nodes.size() - 1) : nodes.size();
         for ( int i = 0; i < size; ++i )
         {
             if ( i > 1 )
@@ -145,6 +156,9 @@ public class ZPathImpl implements ZPath
             }
             str.append(nodes.get(i));
         }
-        return str.toString();
+        path = str.toString();
+
+        cache.compareAndSet(null, path);
+        return path;
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java
new file mode 100644
index 0000000..160ec7d
--- /dev/null
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.details;
+
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.CompletableBaseClassForTests;
+import org.apache.curator.x.async.modeled.JacksonModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ModeledCuratorFramework;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.caching.CachingOption;
+import org.apache.curator.x.async.modeled.models.TestSimpleModel;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestCachedModeledCuratorFramework extends 
CompletableBaseClassForTests
+{
+    private static final ZPath path = ZPath.parse("/test/path");
+    private CuratorFramework rawClient;
+    private ModeledCuratorFramework<TestSimpleModel> client;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception
+    {
+        super.setup();
+
+        rawClient = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        rawClient.start();
+
+        ModelSerializer<TestSimpleModel> serializer = new 
JacksonModelSerializer<>(TestSimpleModel.class);
+        client = ModeledCuratorFramework.builder(rawClient, path, 
serializer).cached().build();
+    }
+
+    @AfterMethod
+    @Override
+    public void teardown() throws Exception
+    {
+        CloseableUtils.closeQuietly(rawClient);
+        super.teardown();
+    }
+
+    @Test
+    public void testBasic() throws InterruptedException
+    {
+        client.caching().start();
+
+        AtomicInteger counter = new AtomicInteger();
+        ((ModeledCuratorFrameworkImpl)client).debugCachedReadCount = counter;
+
+        complete(client.read());
+        Assert.assertEquals(counter.get(), 0);
+
+        complete(client.create(new TestSimpleModel("test", 10)));
+        Assert.assertEquals(counter.get(), 0);
+
+        timing.sleepABit();
+
+        complete(client.read());
+        Assert.assertEquals(counter.get(), 1);
+        counter.set(0);
+
+        complete(client.create(new TestSimpleModel("test2", 20)));
+        Assert.assertEquals(counter.get(), 0);
+
+        timing.sleepABit();
+
+        complete(client.read(), (model, e) -> Assert.assertEquals(model, new 
TestSimpleModel("test2", 20)));
+        Assert.assertEquals(counter.get(), 1);
+
+        client.caching().close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/models/TestSimpleModel.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/models/TestSimpleModel.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/models/TestSimpleModel.java
new file mode 100644
index 0000000..def50c5
--- /dev/null
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/models/TestSimpleModel.java
@@ -0,0 +1,66 @@
+package org.apache.curator.x.async.modeled.models;
+
+import java.util.Objects;
+
+public class TestSimpleModel
+{
+    private final String name;
+    private final int age;
+
+    public TestSimpleModel()
+    {
+        this("", 0);
+    }
+
+    public TestSimpleModel(String name, int age)
+    {
+        this.name = Objects.requireNonNull(name, "name cannot be null");
+        this.age = Objects.requireNonNull(age, "age cannot be null");
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    public int getAge()
+    {
+        return age;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if ( this == o )
+        {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() )
+        {
+            return false;
+        }
+
+        TestSimpleModel that = (TestSimpleModel)o;
+
+        //noinspection SimplifiableIfStatement
+        if ( age != that.age )
+        {
+            return false;
+        }
+        return name.equals(that.name);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = name.hashCode();
+        result = 31 * result + age;
+        return result;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "TestSimpleModel{" + "name='" + name + '\'' + ", age=" + age + 
'}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/63225ba7/curator-x-async/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/resources/log4j.properties 
b/curator-x-async/src/test/resources/log4j.properties
new file mode 100644
index 0000000..2a85e0d
--- /dev/null
+++ b/curator-x-async/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=ERROR, console
+
+log4j.logger.org.apache.curator=DEBUG, console
+log4j.additivity.org.apache.curator=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p %c %x %m [%t]%n

Reply via email to