This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 190cd654a GH-1246. Asynchronously initialize cache before reading II 
(#1250)
190cd654a is described below

commit 190cd654a623d968d52ddc4a05ca5396bc6fa562
Author: Luke Kot-Zaniewski <[email protected]>
AuthorDate: Fri Mar 21 04:21:23 2025 -0400

    GH-1246. Asynchronously initialize cache before reading II (#1250)
    
    This closes #1246 and closes #1247.
---
 .../details/CachedModeledFrameworkImpl.java        | 138 ++++++++++++++-------
 .../async/modeled/TestCachedModeledFramework.java  |  93 +++++++++++++-
 2 files changed, 178 insertions(+), 53 deletions(-)

diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index acccbfecf..791e04523 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -21,7 +21,6 @@ package org.apache.curator.x.async.modeled.details;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
@@ -48,15 +47,40 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T> {
     private final ModeledFramework<T> client;
     private final ModeledCacheImpl<T> cache;
     private final Executor executor;
+    private final ModelStage<Void> init;
 
     CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService 
executor) {
-        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), 
client.modelSpec(), executor), executor);
-    }
-
-    private CachedModeledFrameworkImpl(ModeledFramework<T> client, 
ModeledCacheImpl<T> cache, Executor executor) {
+        this(
+                client,
+                new ModeledCacheImpl<>(client.unwrap().unwrap(), 
client.modelSpec(), executor),
+                executor,
+                ModelStage.make());
+        listenable().addListener(new ModeledCacheListener<T>() {
+            @Override
+            public void accept(Type type, ZPath path, Stat stat, Object model) 
{
+                // NOP
+            }
+
+            @Override
+            public void initialized() {
+                init.complete(null);
+                ModeledCacheListener.super.initialized();
+            }
+
+            @Override
+            public void handleException(Exception e) {
+                init.completeExceptionally(e);
+                ModeledCacheListener.super.handleException(e);
+            }
+        });
+    }
+
+    private CachedModeledFrameworkImpl(
+            ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor 
executor, ModelStage<Void> init) {
         this.client = client;
         this.cache = cache;
         this.executor = executor;
+        this.init = init;
     }
 
     @Override
@@ -106,7 +130,7 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T> {
 
     @Override
     public CachedModeledFramework<T> child(Object child) {
-        return new CachedModeledFrameworkImpl<>(client.child(child), cache, 
executor);
+        return new CachedModeledFrameworkImpl<>(client.child(child), cache, 
executor, init);
     }
 
     @Override
@@ -117,7 +141,7 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T> {
 
     @Override
     public CachedModeledFramework<T> withPath(ZPath path) {
-        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, 
executor);
+        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, 
executor, init);
     }
 
     @Override
@@ -142,24 +166,22 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T> {
 
     @Override
     public AsyncStage<T> read() {
-        return internalRead(ZNode::model, this::exceptionally);
+        return internalRead(ZNode::model);
     }
 
     @Override
     public AsyncStage<T> read(Stat storingStatIn) {
-        return internalRead(
-                n -> {
-                    if (storingStatIn != null) {
-                        DataTree.copyStat(n.stat(), storingStatIn);
-                    }
-                    return n.model();
-                },
-                this::exceptionally);
+        return internalRead(n -> {
+            if (storingStatIn != null) {
+                DataTree.copyStat(n.stat(), storingStatIn);
+            }
+            return n.model();
+        });
     }
 
     @Override
     public AsyncStage<ZNode<T>> readAsZNode() {
-        return internalRead(Function.identity(), this::exceptionally);
+        return internalRead(Function.identity());
     }
 
     @Override
@@ -179,9 +201,7 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T> {
 
     @Override
     public AsyncStage<List<T>> list() {
-        List<T> children =
-                
cache.currentChildren().values().stream().map(ZNode::model).collect(Collectors.toList());
-        return ModelStage.completed(children);
+        return internalChildren(entry -> entry.getValue().model());
     }
 
     @Override
@@ -206,28 +226,17 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T> {
 
     @Override
     public AsyncStage<Stat> checkExists() {
-        ZPath path = client.modelSpec().path();
-        Optional<ZNode<T>> data = cache.currentData(path);
-        return data.map(node -> completed(node.stat())).orElseGet(() -> 
completed(null));
+        return internalRead(ZNode::stat, () -> ModelStage.completed(null));
     }
 
     @Override
     public AsyncStage<List<ZPath>> children() {
-        List<ZPath> paths = 
cache.currentChildren(client.modelSpec().path()).keySet().stream()
-                .filter(path -> !path.isRoot()
-                        && path.parent().equals(client.modelSpec().path()))
-                .collect(Collectors.toList());
-        return completed(paths);
+        return internalChildren(Map.Entry::getKey);
     }
 
     @Override
     public AsyncStage<List<ZNode<T>>> childrenAsZNodes() {
-        List<ZNode<T>> nodes = 
cache.currentChildren(client.modelSpec().path()).entrySet().stream()
-                .filter(e -> !e.getKey().isRoot()
-                        && 
e.getKey().parent().equals(client.modelSpec().path()))
-                .map(Map.Entry::getValue)
-                .collect(Collectors.toList());
-        return completed(nodes);
+        return internalChildren(Map.Entry::getValue);
     }
 
     @Override
@@ -270,19 +279,52 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T> {
         return client.inTransaction(operations);
     }
 
-    private <U> AsyncStage<U> completed(U value) {
-        return ModelStage.completed(value);
-    }
-
-    private <U> AsyncStage<U> exceptionally() {
-        KeeperException.NoNodeException exception =
-                new 
KeeperException.NoNodeException(client.modelSpec().path().fullPath());
-        return ModelStage.exceptionally(exception);
-    }
-
-    private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver, 
Supplier<AsyncStage<U>> elseProc) {
-        ZPath path = client.modelSpec().path();
-        Optional<ZNode<T>> data = cache.currentData(path);
-        return data.map(node -> 
completed(resolver.apply(node))).orElseGet(elseProc);
+    private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver) {
+        return internalRead(resolver, null);
+    }
+
+    private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver, 
Supplier<AsyncStage<U>> defaultSupplier) {
+        ModelStage<U> stage = ModelStage.make();
+        init.whenComplete((__, throwable) -> {
+            if (throwable == null) {
+                ZPath path = client.modelSpec().path();
+                ZNode<T> zNode = cache.currentData(path).orElse(null);
+                if (zNode == null) {
+                    if (defaultSupplier == null) {
+                        stage.completeExceptionally(new 
KeeperException.NoNodeException(
+                                client.modelSpec().path().fullPath()));
+                    } else {
+                        defaultSupplier.get().whenComplete((elseData, 
elseThrowable) -> {
+                            if (elseThrowable == null) {
+                                stage.complete(elseData);
+                            } else {
+                                stage.completeExceptionally(elseThrowable);
+                            }
+                        });
+                    }
+                } else {
+                    stage.complete(resolver.apply(zNode));
+                }
+            } else {
+                stage.completeExceptionally(throwable);
+            }
+        });
+        return stage;
+    }
+
+    private <U> ModelStage<List<U>> internalChildren(Function<Map.Entry<ZPath, 
ZNode<T>>, U> resolver) {
+        ModelStage<List<U>> stage = ModelStage.make();
+        init.whenComplete((__, throwable) -> {
+            if (throwable == null) {
+                
stage.complete(cache.currentChildren(client.modelSpec().path()).entrySet().stream()
+                        .filter(e -> !e.getKey().isRoot()
+                                && 
e.getKey().parent().equals(client.modelSpec().path()))
+                        .map(resolver)
+                        .collect(Collectors.toList()));
+            } else {
+                stage.completeExceptionally(throwable);
+            }
+        });
+        return stage;
     }
 }
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 65c68aba9..0e767ea37 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -19,11 +19,7 @@
 
 package org.apache.curator.x.async.modeled;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.*;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.math.BigInteger;
@@ -32,7 +28,9 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -41,9 +39,11 @@ import java.util.stream.Stream;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
 import org.apache.curator.x.async.modeled.models.TestModel;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
@@ -78,6 +78,10 @@ public class TestCachedModeledFramework extends 
TestModeledFrameworkBase {
                 assertNotNull(value);
                 assertNull(e);
             }));
+            complete(client.child(model).checkExists().whenComplete((value, e) 
-> {
+                assertNotNull(value);
+                assertNull(e);
+            }));
         } finally {
             client.close();
         }
@@ -150,6 +154,13 @@ public class TestCachedModeledFramework extends 
TestModeledFrameworkBase {
             complete(
                     client.child("p").child("c2").childrenAsZNodes(),
                     (v, e) -> assertEquals(toSet(v.stream(), ZNode::model), 
Sets.newHashSet(grandChild2)));
+
+            complete(
+                    client.child("p").child("c1").list(),
+                    (v, e) -> assertEquals(toSet(v.stream(), 
Function.identity()), Sets.newHashSet(grandChild1)));
+            complete(
+                    client.child("p").child("c2").list(),
+                    (v, e) -> assertEquals(toSet(v.stream(), 
Function.identity()), Sets.newHashSet(grandChild2)));
         }
     }
 
@@ -201,6 +212,78 @@ public class TestCachedModeledFramework extends 
TestModeledFrameworkBase {
         verifyEmptyNodeDeserialization(byteModel, byteModelSpec);
     }
 
+    @Test
+    void testInitializedCachedModeledFramework() throws ExecutionException, 
InterruptedException, TimeoutException {
+        try (CachedModeledFramework<TestModel> client =
+                ModeledFramework.wrap(async, modelSpec).cached()) {
+            TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
+            
assertNotNull(timing.getFuture(client.set(model).toCompletableFuture()));
+            AsyncStage<TestModel> asyncModel = client.read();
+            client.start();
+            assertEquals(model, 
timing.getFuture(asyncModel.toCompletableFuture()));
+        }
+    }
+
+    @Test
+    void testNoNodeException() throws InterruptedException, TimeoutException {
+        try (CachedModeledFramework<TestModel> client =
+                ModeledFramework.wrap(async, modelSpec).cached()) {
+            AsyncStage<TestModel> asyncModel = client.read();
+            client.start();
+            try {
+                timing.getFuture(asyncModel.toCompletableFuture());
+                fail("This test should result in a NoNodeException");
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof 
KeeperException.NoNodeException);
+            }
+
+            complete(client.checkExists().whenComplete((value, e) -> {
+                assertNull(value);
+                assertNull(e);
+            }));
+        }
+    }
+
+    @Test
+    void testReadThrough() throws InterruptedException, TimeoutException, 
ExecutionException {
+        try (CachedModeledFramework<TestModel> client =
+                ModeledFramework.wrap(async, modelSpec).cached()) {
+            TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
+            Semaphore semaphore = new Semaphore(0);
+            client.listenable().addListener(new 
ModeledCacheListener<TestModel>() {
+                @Override
+                public void accept(Type type, ZPath path, Stat stat, TestModel 
model) {
+                    // NOP
+                }
+
+                @Override
+                public void initialized() {
+                    semaphore.release();
+                }
+            });
+            client.start();
+            assertTrue(timing.acquireSemaphore(semaphore));
+            
assertNotNull(timing.getFuture(client.set(model).toCompletableFuture()));
+            assertEquals(model, 
timing.getFuture(client.readThrough().toCompletableFuture()));
+        }
+    }
+
+    @Test
+    void testReadThroughFailure() throws InterruptedException, 
TimeoutException, ExecutionException {
+        try (CachedModeledFramework<TestModel> client =
+                ModeledFramework.wrap(async, modelSpec).cached()) {
+            client.start();
+            
assertNull(timing.getFuture(client.delete().toCompletableFuture()));
+            complete(client.readThrough(), (d, e) -> {
+                if (e == null) {
+                    fail("This test should result in a NoNodeException");
+                } else {
+                    assertEquals(KeeperException.NoNodeException.class, 
e.getClass());
+                }
+            });
+        }
+    }
+
     private <T> void verifyEmptyNodeDeserialization(T model, ModelSpec<T> 
parentModelSpec) {
         // The sub-path is the ZNode that will be removed that does not 
contain any model data. Their should be no
         // attempt to deserialize this empty ZNode.

Reply via email to