This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 190cd654a GH-1246. Asynchronously initialize cache before reading II
(#1250)
190cd654a is described below
commit 190cd654a623d968d52ddc4a05ca5396bc6fa562
Author: Luke Kot-Zaniewski <[email protected]>
AuthorDate: Fri Mar 21 04:21:23 2025 -0400
GH-1246. Asynchronously initialize cache before reading II (#1250)
This closes #1246 and closes #1247.
---
.../details/CachedModeledFrameworkImpl.java | 138 ++++++++++++++-------
.../async/modeled/TestCachedModeledFramework.java | 93 +++++++++++++-
2 files changed, 178 insertions(+), 53 deletions(-)
diff --git
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index acccbfecf..791e04523 100644
---
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -21,7 +21,6 @@ package org.apache.curator.x.async.modeled.details;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
@@ -48,15 +47,40 @@ class CachedModeledFrameworkImpl<T> implements
CachedModeledFramework<T> {
private final ModeledFramework<T> client;
private final ModeledCacheImpl<T> cache;
private final Executor executor;
+ private final ModelStage<Void> init;
CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService
executor) {
- this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(),
client.modelSpec(), executor), executor);
- }
-
- private CachedModeledFrameworkImpl(ModeledFramework<T> client,
ModeledCacheImpl<T> cache, Executor executor) {
+ this(
+ client,
+ new ModeledCacheImpl<>(client.unwrap().unwrap(),
client.modelSpec(), executor),
+ executor,
+ ModelStage.make());
+ listenable().addListener(new ModeledCacheListener<T>() {
+ @Override
+ public void accept(Type type, ZPath path, Stat stat, Object model)
{
+ // NOP
+ }
+
+ @Override
+ public void initialized() {
+ init.complete(null);
+ ModeledCacheListener.super.initialized();
+ }
+
+ @Override
+ public void handleException(Exception e) {
+ init.completeExceptionally(e);
+ ModeledCacheListener.super.handleException(e);
+ }
+ });
+ }
+
+ private CachedModeledFrameworkImpl(
+ ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor
executor, ModelStage<Void> init) {
this.client = client;
this.cache = cache;
this.executor = executor;
+ this.init = init;
}
@Override
@@ -106,7 +130,7 @@ class CachedModeledFrameworkImpl<T> implements
CachedModeledFramework<T> {
@Override
public CachedModeledFramework<T> child(Object child) {
- return new CachedModeledFrameworkImpl<>(client.child(child), cache,
executor);
+ return new CachedModeledFrameworkImpl<>(client.child(child), cache,
executor, init);
}
@Override
@@ -117,7 +141,7 @@ class CachedModeledFrameworkImpl<T> implements
CachedModeledFramework<T> {
@Override
public CachedModeledFramework<T> withPath(ZPath path) {
- return new CachedModeledFrameworkImpl<>(client.withPath(path), cache,
executor);
+ return new CachedModeledFrameworkImpl<>(client.withPath(path), cache,
executor, init);
}
@Override
@@ -142,24 +166,22 @@ class CachedModeledFrameworkImpl<T> implements
CachedModeledFramework<T> {
@Override
public AsyncStage<T> read() {
- return internalRead(ZNode::model, this::exceptionally);
+ return internalRead(ZNode::model);
}
@Override
public AsyncStage<T> read(Stat storingStatIn) {
- return internalRead(
- n -> {
- if (storingStatIn != null) {
- DataTree.copyStat(n.stat(), storingStatIn);
- }
- return n.model();
- },
- this::exceptionally);
+ return internalRead(n -> {
+ if (storingStatIn != null) {
+ DataTree.copyStat(n.stat(), storingStatIn);
+ }
+ return n.model();
+ });
}
@Override
public AsyncStage<ZNode<T>> readAsZNode() {
- return internalRead(Function.identity(), this::exceptionally);
+ return internalRead(Function.identity());
}
@Override
@@ -179,9 +201,7 @@ class CachedModeledFrameworkImpl<T> implements
CachedModeledFramework<T> {
@Override
public AsyncStage<List<T>> list() {
- List<T> children =
-
cache.currentChildren().values().stream().map(ZNode::model).collect(Collectors.toList());
- return ModelStage.completed(children);
+ return internalChildren(entry -> entry.getValue().model());
}
@Override
@@ -206,28 +226,17 @@ class CachedModeledFrameworkImpl<T> implements
CachedModeledFramework<T> {
@Override
public AsyncStage<Stat> checkExists() {
- ZPath path = client.modelSpec().path();
- Optional<ZNode<T>> data = cache.currentData(path);
- return data.map(node -> completed(node.stat())).orElseGet(() ->
completed(null));
+ return internalRead(ZNode::stat, () -> ModelStage.completed(null));
}
@Override
public AsyncStage<List<ZPath>> children() {
- List<ZPath> paths =
cache.currentChildren(client.modelSpec().path()).keySet().stream()
- .filter(path -> !path.isRoot()
- && path.parent().equals(client.modelSpec().path()))
- .collect(Collectors.toList());
- return completed(paths);
+ return internalChildren(Map.Entry::getKey);
}
@Override
public AsyncStage<List<ZNode<T>>> childrenAsZNodes() {
- List<ZNode<T>> nodes =
cache.currentChildren(client.modelSpec().path()).entrySet().stream()
- .filter(e -> !e.getKey().isRoot()
- &&
e.getKey().parent().equals(client.modelSpec().path()))
- .map(Map.Entry::getValue)
- .collect(Collectors.toList());
- return completed(nodes);
+ return internalChildren(Map.Entry::getValue);
}
@Override
@@ -270,19 +279,52 @@ class CachedModeledFrameworkImpl<T> implements
CachedModeledFramework<T> {
return client.inTransaction(operations);
}
- private <U> AsyncStage<U> completed(U value) {
- return ModelStage.completed(value);
- }
-
- private <U> AsyncStage<U> exceptionally() {
- KeeperException.NoNodeException exception =
- new
KeeperException.NoNodeException(client.modelSpec().path().fullPath());
- return ModelStage.exceptionally(exception);
- }
-
- private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver,
Supplier<AsyncStage<U>> elseProc) {
- ZPath path = client.modelSpec().path();
- Optional<ZNode<T>> data = cache.currentData(path);
- return data.map(node ->
completed(resolver.apply(node))).orElseGet(elseProc);
+ private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver) {
+ return internalRead(resolver, null);
+ }
+
+ private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver,
Supplier<AsyncStage<U>> defaultSupplier) {
+ ModelStage<U> stage = ModelStage.make();
+ init.whenComplete((__, throwable) -> {
+ if (throwable == null) {
+ ZPath path = client.modelSpec().path();
+ ZNode<T> zNode = cache.currentData(path).orElse(null);
+ if (zNode == null) {
+ if (defaultSupplier == null) {
+ stage.completeExceptionally(new
KeeperException.NoNodeException(
+ client.modelSpec().path().fullPath()));
+ } else {
+ defaultSupplier.get().whenComplete((elseData,
elseThrowable) -> {
+ if (elseThrowable == null) {
+ stage.complete(elseData);
+ } else {
+ stage.completeExceptionally(elseThrowable);
+ }
+ });
+ }
+ } else {
+ stage.complete(resolver.apply(zNode));
+ }
+ } else {
+ stage.completeExceptionally(throwable);
+ }
+ });
+ return stage;
+ }
+
+ private <U> ModelStage<List<U>> internalChildren(Function<Map.Entry<ZPath,
ZNode<T>>, U> resolver) {
+ ModelStage<List<U>> stage = ModelStage.make();
+ init.whenComplete((__, throwable) -> {
+ if (throwable == null) {
+
stage.complete(cache.currentChildren(client.modelSpec().path()).entrySet().stream()
+ .filter(e -> !e.getKey().isRoot()
+ &&
e.getKey().parent().equals(client.modelSpec().path()))
+ .map(resolver)
+ .collect(Collectors.toList()));
+ } else {
+ stage.completeExceptionally(throwable);
+ }
+ });
+ return stage;
}
}
diff --git
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 65c68aba9..0e767ea37 100644
---
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -19,11 +19,7 @@
package org.apache.curator.x.async.modeled;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.*;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.math.BigInteger;
@@ -32,7 +28,9 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -41,9 +39,11 @@ import java.util.stream.Stream;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.test.Timing;
import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
import org.apache.curator.x.async.modeled.models.TestModel;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -78,6 +78,10 @@ public class TestCachedModeledFramework extends
TestModeledFrameworkBase {
assertNotNull(value);
assertNull(e);
}));
+ complete(client.child(model).checkExists().whenComplete((value, e)
-> {
+ assertNotNull(value);
+ assertNull(e);
+ }));
} finally {
client.close();
}
@@ -150,6 +154,13 @@ public class TestCachedModeledFramework extends
TestModeledFrameworkBase {
complete(
client.child("p").child("c2").childrenAsZNodes(),
(v, e) -> assertEquals(toSet(v.stream(), ZNode::model),
Sets.newHashSet(grandChild2)));
+
+ complete(
+ client.child("p").child("c1").list(),
+ (v, e) -> assertEquals(toSet(v.stream(),
Function.identity()), Sets.newHashSet(grandChild1)));
+ complete(
+ client.child("p").child("c2").list(),
+ (v, e) -> assertEquals(toSet(v.stream(),
Function.identity()), Sets.newHashSet(grandChild2)));
}
}
@@ -201,6 +212,78 @@ public class TestCachedModeledFramework extends
TestModeledFrameworkBase {
verifyEmptyNodeDeserialization(byteModel, byteModelSpec);
}
+ @Test
+ void testInitializedCachedModeledFramework() throws ExecutionException,
InterruptedException, TimeoutException {
+ try (CachedModeledFramework<TestModel> client =
+ ModeledFramework.wrap(async, modelSpec).cached()) {
+ TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
+
assertNotNull(timing.getFuture(client.set(model).toCompletableFuture()));
+ AsyncStage<TestModel> asyncModel = client.read();
+ client.start();
+ assertEquals(model,
timing.getFuture(asyncModel.toCompletableFuture()));
+ }
+ }
+
+ @Test
+ void testNoNodeException() throws InterruptedException, TimeoutException {
+ try (CachedModeledFramework<TestModel> client =
+ ModeledFramework.wrap(async, modelSpec).cached()) {
+ AsyncStage<TestModel> asyncModel = client.read();
+ client.start();
+ try {
+ timing.getFuture(asyncModel.toCompletableFuture());
+ fail("This test should result in a NoNodeException");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof
KeeperException.NoNodeException);
+ }
+
+ complete(client.checkExists().whenComplete((value, e) -> {
+ assertNull(value);
+ assertNull(e);
+ }));
+ }
+ }
+
+ @Test
+ void testReadThrough() throws InterruptedException, TimeoutException,
ExecutionException {
+ try (CachedModeledFramework<TestModel> client =
+ ModeledFramework.wrap(async, modelSpec).cached()) {
+ TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
+ Semaphore semaphore = new Semaphore(0);
+ client.listenable().addListener(new
ModeledCacheListener<TestModel>() {
+ @Override
+ public void accept(Type type, ZPath path, Stat stat, TestModel
model) {
+ // NOP
+ }
+
+ @Override
+ public void initialized() {
+ semaphore.release();
+ }
+ });
+ client.start();
+ assertTrue(timing.acquireSemaphore(semaphore));
+
assertNotNull(timing.getFuture(client.set(model).toCompletableFuture()));
+ assertEquals(model,
timing.getFuture(client.readThrough().toCompletableFuture()));
+ }
+ }
+
+ @Test
+ void testReadThroughFailure() throws InterruptedException,
TimeoutException, ExecutionException {
+ try (CachedModeledFramework<TestModel> client =
+ ModeledFramework.wrap(async, modelSpec).cached()) {
+ client.start();
+
assertNull(timing.getFuture(client.delete().toCompletableFuture()));
+ complete(client.readThrough(), (d, e) -> {
+ if (e == null) {
+ fail("This test should result in a NoNodeException");
+ } else {
+ assertEquals(KeeperException.NoNodeException.class,
e.getClass());
+ }
+ });
+ }
+ }
+
private <T> void verifyEmptyNodeDeserialization(T model, ModelSpec<T>
parentModelSpec) {
// The sub-path is the ZNode that will be removed that does not
contain any model data. Their should be no
// attempt to deserialize this empty ZNode.