Using a blocking queue was incorrect. This is more Scala-like now. I misunderstood how Promises work in Scala
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c53f59dc Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c53f59dc Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c53f59dc Branch: refs/heads/CURATOR-3.0 Commit: c53f59dcc048006bce0a5df3c0718ccdbb39774f Parents: a5c460c Author: randgalt <randg...@apache.org> Authored: Thu Jan 5 13:18:18 2017 -0500 Committer: randgalt <randg...@apache.org> Committed: Thu Jan 5 13:18:18 2017 -0500 ---------------------------------------------------------------------- .../curator/x/crimps/AsyncPathAndBytesable.java | 28 ++------ .../apache/curator/x/crimps/CrimpResult.java | 2 +- .../org/apache/curator/x/crimps/Crimped.java | 7 +- .../apache/curator/x/crimps/CrimpedBytes.java | 6 +- .../curator/x/crimps/CrimpedBytesImpl.java | 15 ++-- .../apache/curator/x/crimps/CrimpedImpl.java | 13 ++-- .../org/apache/curator/x/crimps/Crimps.java | 58 ++++++---------- .../org/apache/curator/x/crimps/TestCrimps.java | 72 +++++++++++++------- 8 files changed, 95 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java index 320f28e..f30bc23 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java @@ -3,15 +3,12 @@ package org.apache.curator.x.crimps; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; -import java.util.function.Supplier; -class AsyncPathAndBytesable<T> implements Supplier<T>, BackgroundCallback +class AsyncPathAndBytesable<T> extends CompletableFuture<T> implements BackgroundCallback { private final Function<CuratorEvent, CrimpResult<T>> resultFunction; - private final BlockingQueue<CrimpResult<T>> queue = new ArrayBlockingQueue<>(1); AsyncPathAndBytesable(Function<CuratorEvent, CrimpResult<T>> resultFunction) { @@ -21,25 +18,14 @@ class AsyncPathAndBytesable<T> implements Supplier<T>, BackgroundCallback @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { - queue.offer(resultFunction.apply(event)); - } - - @Override - public T get() - { - try + CrimpResult<T> result = resultFunction.apply(event); + if ( result.exception != null ) { - CrimpResult<T> result = queue.take(); - if ( result.exception != null ) - { - throw new CrimpException(result.exception); - } - return result.value; + completeExceptionally(result.exception); } - catch ( InterruptedException e ) + else { - Thread.currentThread().interrupt(); - throw new CrimpException(e); + complete(result.value); } } } http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java index 602cfe1..849c711 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java @@ -13,7 +13,7 @@ class CrimpResult<T> this.exception = null; } - public CrimpResult(KeeperException exception) + CrimpResult(KeeperException exception) { this.value = null; this.exception = exception; http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java index 862ca4b..8d18898 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java @@ -1,12 +1,11 @@ package org.apache.curator.x.crimps; -import org.apache.curator.framework.api.Backgroundable; import org.apache.curator.framework.api.ErrorListenerPathable; import org.apache.curator.framework.api.Pathable; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; public interface Crimped<T> extends - ErrorListenerPathable<CompletableFuture<T>>, - Pathable<CompletableFuture<T>> + ErrorListenerPathable<CompletionStage<T>>, + Pathable<CompletionStage<T>> { } http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java index 3c5b39e..3ce0b8e 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java @@ -2,10 +2,10 @@ package org.apache.curator.x.crimps; import org.apache.curator.framework.api.ErrorListenerPathAndBytesable; import org.apache.curator.framework.api.PathAndBytesable; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; public interface CrimpedBytes<T> extends - ErrorListenerPathAndBytesable<CompletableFuture<T>>, - PathAndBytesable<CompletableFuture<T>> + ErrorListenerPathAndBytesable<CompletionStage<T>>, + PathAndBytesable<CompletionStage<T>> { } http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java index 4d6e477..a7828c9 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java @@ -7,8 +7,7 @@ import org.apache.curator.framework.api.PathAndBytesable; import org.apache.curator.framework.api.UnhandledErrorListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; +import java.util.concurrent.CompletionStage; import java.util.function.Function; class CrimpedBytesImpl<T> implements CrimpedBytes<T> @@ -16,37 +15,35 @@ class CrimpedBytesImpl<T> implements CrimpedBytes<T> private final Logger log = LoggerFactory.getLogger(getClass()); private final BackgroundPathAndBytesable<T> builder; private final Function<CuratorEvent, CrimpResult<T>> resultFunction; - private final Executor executor; private UnhandledErrorListener unhandledErrorListener; - CrimpedBytesImpl(BackgroundPathAndBytesable<T> builder, Executor executor, Function<CuratorEvent, CrimpResult<T>> resultFunction) + CrimpedBytesImpl(BackgroundPathAndBytesable<T> builder, Function<CuratorEvent, CrimpResult<T>> resultFunction) { - this.executor = executor; this.builder = builder; this.resultFunction = resultFunction; unhandledErrorListener = null; } @Override - public PathAndBytesable<CompletableFuture<T>> withUnhandledErrorListener(UnhandledErrorListener listener) + public PathAndBytesable<CompletionStage<T>> withUnhandledErrorListener(UnhandledErrorListener listener) { unhandledErrorListener = listener; return this; } @Override - public CompletableFuture<T> forPath(String path) throws Exception + public CompletionStage<T> forPath(String path) throws Exception { return forPath(path, null); } @Override - public CompletableFuture<T> forPath(String path, byte[] data) throws Exception + public CompletionStage<T> forPath(String path, byte[] data) throws Exception { AsyncPathAndBytesable<T> supplier = new AsyncPathAndBytesable<T>(resultFunction); ErrorListenerPathAndBytesable<T> localBuilder = builder.inBackground(supplier); PathAndBytesable<T> finalLocalBuilder = (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder; finalLocalBuilder.forPath(path, data); - return CompletableFuture.supplyAsync(supplier, executor); + return supplier; } } http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java index 52a47ed..268ab48 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java @@ -7,8 +7,7 @@ import org.apache.curator.framework.api.Pathable; import org.apache.curator.framework.api.UnhandledErrorListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; +import java.util.concurrent.CompletionStage; import java.util.function.Function; class CrimpedImpl<T> implements Crimped<T> @@ -16,31 +15,29 @@ class CrimpedImpl<T> implements Crimped<T> private final Logger log = LoggerFactory.getLogger(getClass()); private final BackgroundPathable<T> builder; private final Function<CuratorEvent, CrimpResult<T>> resultFunction; - private final Executor executor; private UnhandledErrorListener unhandledErrorListener; - CrimpedImpl(BackgroundPathable<T> builder, Executor executor, Function<CuratorEvent, CrimpResult<T>> resultFunction) + CrimpedImpl(BackgroundPathable<T> builder, Function<CuratorEvent, CrimpResult<T>> resultFunction) { - this.executor = executor; this.builder = builder; this.resultFunction = resultFunction; unhandledErrorListener = null; } @Override - public Pathable<CompletableFuture<T>> withUnhandledErrorListener(UnhandledErrorListener listener) + public Pathable<CompletionStage<T>> withUnhandledErrorListener(UnhandledErrorListener listener) { unhandledErrorListener = listener; return this; } @Override - public CompletableFuture<T> forPath(String path) throws Exception + public CompletionStage<T> forPath(String path) throws Exception { AsyncPathAndBytesable<T> supplier = new AsyncPathAndBytesable<T>(resultFunction); ErrorListenerPathable<T> localBuilder = builder.inBackground(supplier); Pathable<T> finalLocalBuilder = (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder; finalLocalBuilder.forPath(path); - return CompletableFuture.supplyAsync(supplier, executor); + return supplier; } } http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java index 1e42ad6..440aeca 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java @@ -1,6 +1,5 @@ package org.apache.curator.x.crimps; -import com.google.common.util.concurrent.MoreExecutors; import org.apache.curator.framework.api.BackgroundPathAndBytesable; import org.apache.curator.framework.api.BackgroundPathable; import org.apache.curator.framework.api.CuratorEvent; @@ -8,14 +7,10 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import java.util.List; -import java.util.Objects; -import java.util.concurrent.Executor; import java.util.function.Function; public class Crimps { - private final Executor executor; - private static final Function<CuratorEvent, CrimpResult<String>> nameSupplier = makeSupplier(CuratorEvent::getName); private static final Function<CuratorEvent, CrimpResult<String>> pathSupplier = makeSupplier(CuratorEvent::getPath); private static final Function<CuratorEvent, CrimpResult<Void>> voidSupplier = makeSupplier(e -> null); @@ -34,68 +29,57 @@ public class Crimps return new CrimpResult<>(KeeperException.create(KeeperException.Code.get(event.getResultCode()))); } - public CrimpedBytes<String> nameInBackground(BackgroundPathAndBytesable<String> builder) - { - return build(builder, executor, nameSupplier); - } - - public CrimpedBytes<String> pathInBackground(BackgroundPathAndBytesable<String> builder) - { - return build(builder, executor, pathSupplier); - } - - public Crimped<Void> voidInBackground(BackgroundPathable<Void> builder) + public static CrimpedBytes<String> nameInBackground(BackgroundPathAndBytesable<String> builder) { - return build(builder, executor, voidSupplier); + return build(builder, nameSupplier); } - public Crimped<byte[]> dataInBackground(BackgroundPathable<byte[]> builder) + public static CrimpedBytes<String> pathInBackground(BackgroundPathAndBytesable<String> builder) { - return build(builder, executor, dataSupplier); + return build(builder, pathSupplier); } - public Crimped<List<String>> childrenInBackground(BackgroundPathable<List<String>> builder) + public static Crimped<Void> voidInBackground(BackgroundPathable<Void> builder) { - return build(builder, executor, childrenSupplier); + return build(builder, voidSupplier); } - public Crimped<Stat> statInBackground(BackgroundPathable<Stat> builder) + public static Crimped<byte[]> dataInBackground(BackgroundPathable<byte[]> builder) { - return build(builder, executor, statSupplier); + return build(builder, dataSupplier); } - public Crimped<List<ACL>> aclsInBackground(BackgroundPathable<List<ACL>> builder) + public static Crimped<List<String>> childrenInBackground(BackgroundPathable<List<String>> builder) { - return build(builder, executor, aclSupplier); + return build(builder, childrenSupplier); } - public CrimpedBytes<Stat> statBytesInBackground(BackgroundPathAndBytesable<Stat> builder) + public static Crimped<Stat> statInBackground(BackgroundPathable<Stat> builder) { - return build(builder, executor, statSupplier); + return build(builder, statSupplier); } - public static <T> CrimpedBytes<T> build(BackgroundPathAndBytesable<T> builder, Executor executor, Function<CuratorEvent, CrimpResult<T>> supplier) + public static Crimped<List<ACL>> aclsInBackground(BackgroundPathable<List<ACL>> builder) { - return new CrimpedBytesImpl<>(builder, executor, supplier); + return build(builder, aclSupplier); } - public static <T> Crimped<T> build(BackgroundPathable<T> builder, Executor executor, Function<CuratorEvent, CrimpResult<T>> supplier) + public static CrimpedBytes<Stat> statBytesInBackground(BackgroundPathAndBytesable<Stat> builder) { - return new CrimpedImpl<>(builder, executor, supplier); + return build(builder, statSupplier); } - public static Crimps newCrimps() + public static <T> CrimpedBytes<T> build(BackgroundPathAndBytesable<T> builder, Function<CuratorEvent, CrimpResult<T>> supplier) { - return new Crimps(MoreExecutors.sameThreadExecutor()); + return new CrimpedBytesImpl<>(builder, supplier); } - public static Crimps newCrimps(Executor executor) + public static <T> Crimped<T> build(BackgroundPathable<T> builder, Function<CuratorEvent, CrimpResult<T>> supplier) { - return new Crimps(executor); + return new CrimpedImpl<>(builder, supplier); } - private Crimps(Executor executor) // TODO + private Crimps() { - this.executor = Objects.requireNonNull(executor, "executor cannot be null"); } } http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java index 083b727..17c760b 100644 --- a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java +++ b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java @@ -9,29 +9,31 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.testng.Assert; import org.testng.annotations.Test; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; public class TestCrimps extends BaseClassForTests { - private final Crimps crimps = Crimps.newCrimps(); - @Test public void testCreateAndSet() throws Exception { try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) { client.start(); - CompletableFuture<String> f = crimps.nameInBackground(client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).forPath("/a/b/c"); - String path = f.get(); - Assert.assertEquals(path, "/a/b/c0000000000"); + CompletionStage<String> f = Crimps.nameInBackground(client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).forPath("/a/b/c"); + complete(f.handle((path, e) -> { + Assert.assertEquals(path, "/a/b/c0000000000"); + return null; + })); - f = crimps.nameInBackground(client.create()).forPath("/foo/bar"); + f = Crimps.nameInBackground(client.create()).forPath("/foo/bar"); assertException(f, KeeperException.Code.NONODE); - CompletableFuture<Stat> statFuture = crimps.statBytesInBackground(client.setData()).forPath(path, "hey".getBytes()); - Stat stat = statFuture.get(); - Assert.assertNotNull(stat); + CompletionStage<Stat> statFuture = Crimps.statBytesInBackground(client.setData()).forPath("/a/b/c0000000000", "hey".getBytes()); + complete(statFuture.handle((stat, e) -> { + Assert.assertNotNull(stat); + return null; + })); } } @@ -43,11 +45,14 @@ public class TestCrimps extends BaseClassForTests client.start(); client.create().forPath("/test"); - CompletableFuture<Void> f = crimps.voidInBackground(client.delete()).forPath("/test"); - Void result = f.get(); - Assert.assertEquals(result, null); + CompletionStage<Void> f = Crimps.voidInBackground(client.delete()).forPath("/test"); + complete(f.handle((v, e) -> { + Assert.assertEquals(v, null); + Assert.assertEquals(e, null); + return null; + })); - f = crimps.voidInBackground(client.delete()).forPath("/test"); + f = Crimps.voidInBackground(client.delete()).forPath("/test"); assertException(f, KeeperException.Code.NONODE); } } @@ -60,24 +65,45 @@ public class TestCrimps extends BaseClassForTests client.start(); client.create().forPath("/test", "foo".getBytes()); - CompletableFuture<byte[]> f = crimps.dataInBackground(client.getData()).forPath("/test"); - byte[] data = f.get(); - Assert.assertEquals(data, "foo".getBytes()); + CompletionStage<byte[]> f = Crimps.dataInBackground(client.getData()).forPath("/test"); + complete(f.handle((data, e) -> { + Assert.assertEquals(data, "foo".getBytes()); + return null; + })); } } - public void assertException(CompletableFuture<?> f, KeeperException.Code code) throws InterruptedException + public void assertException(CompletionStage<?> f, KeeperException.Code code) throws Exception + { + complete(f.handle((value, e) -> { + if ( e == null ) + { + Assert.fail(code + " expected"); + } + KeeperException keeperException = CrimpException.unwrap(e); + Assert.assertNotNull(keeperException); + Assert.assertEquals(keeperException.code(), code); + return null; + })); + } + + private void complete(CompletionStage<?> f) throws Exception { try { - f.get(); - Assert.fail(); + f.toCompletableFuture().get(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); } catch ( ExecutionException e ) { - KeeperException keeperException = CrimpException.unwrap(e); - Assert.assertNotNull(keeperException); - Assert.assertEquals(keeperException.code(), code); + if ( e.getCause() instanceof AssertionError ) + { + throw ((AssertionError)e.getCause()); + } + throw e; } } }