Using a blocking queue was incorrect. This is more Scala-like now. I 
misunderstood how Promises work in Scala


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c53f59dc
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c53f59dc
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c53f59dc

Branch: refs/heads/CURATOR-3.0
Commit: c53f59dcc048006bce0a5df3c0718ccdbb39774f
Parents: a5c460c
Author: randgalt <randg...@apache.org>
Authored: Thu Jan 5 13:18:18 2017 -0500
Committer: randgalt <randg...@apache.org>
Committed: Thu Jan 5 13:18:18 2017 -0500

----------------------------------------------------------------------
 .../curator/x/crimps/AsyncPathAndBytesable.java | 28 ++------
 .../apache/curator/x/crimps/CrimpResult.java    |  2 +-
 .../org/apache/curator/x/crimps/Crimped.java    |  7 +-
 .../apache/curator/x/crimps/CrimpedBytes.java   |  6 +-
 .../curator/x/crimps/CrimpedBytesImpl.java      | 15 ++--
 .../apache/curator/x/crimps/CrimpedImpl.java    | 13 ++--
 .../org/apache/curator/x/crimps/Crimps.java     | 58 ++++++----------
 .../org/apache/curator/x/crimps/TestCrimps.java | 72 +++++++++++++-------
 8 files changed, 95 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java
----------------------------------------------------------------------
diff --git 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java
 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java
index 320f28e..f30bc23 100644
--- 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java
+++ 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java
@@ -3,15 +3,12 @@ package org.apache.curator.x.crimps;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
-import java.util.function.Supplier;
 
-class AsyncPathAndBytesable<T> implements Supplier<T>, BackgroundCallback
+class AsyncPathAndBytesable<T> extends CompletableFuture<T> implements 
BackgroundCallback
 {
     private final Function<CuratorEvent, CrimpResult<T>> resultFunction;
-    private final BlockingQueue<CrimpResult<T>> queue = new 
ArrayBlockingQueue<>(1);
 
     AsyncPathAndBytesable(Function<CuratorEvent, CrimpResult<T>> 
resultFunction)
     {
@@ -21,25 +18,14 @@ class AsyncPathAndBytesable<T> implements Supplier<T>, 
BackgroundCallback
     @Override
     public void processResult(CuratorFramework client, CuratorEvent event) 
throws Exception
     {
-        queue.offer(resultFunction.apply(event));
-    }
-
-    @Override
-    public T get()
-    {
-        try
+        CrimpResult<T> result = resultFunction.apply(event);
+        if ( result.exception != null )
         {
-            CrimpResult<T> result = queue.take();
-            if ( result.exception != null )
-            {
-                throw new CrimpException(result.exception);
-            }
-            return result.value;
+            completeExceptionally(result.exception);
         }
-        catch ( InterruptedException e )
+        else
         {
-            Thread.currentThread().interrupt();
-            throw new CrimpException(e);
+            complete(result.value);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java
----------------------------------------------------------------------
diff --git 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java
index 602cfe1..849c711 100644
--- 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java
+++ 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java
@@ -13,7 +13,7 @@ class CrimpResult<T>
         this.exception = null;
     }
 
-    public CrimpResult(KeeperException exception)
+    CrimpResult(KeeperException exception)
     {
         this.value = null;
         this.exception = exception;

http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java
----------------------------------------------------------------------
diff --git 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java
index 862ca4b..8d18898 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java
@@ -1,12 +1,11 @@
 package org.apache.curator.x.crimps;
 
-import org.apache.curator.framework.api.Backgroundable;
 import org.apache.curator.framework.api.ErrorListenerPathable;
 import org.apache.curator.framework.api.Pathable;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 
 public interface Crimped<T> extends
-    ErrorListenerPathable<CompletableFuture<T>>,
-    Pathable<CompletableFuture<T>>
+    ErrorListenerPathable<CompletionStage<T>>,
+    Pathable<CompletionStage<T>>
 {
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java
----------------------------------------------------------------------
diff --git 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java
index 3c5b39e..3ce0b8e 100644
--- 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java
+++ 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java
@@ -2,10 +2,10 @@ package org.apache.curator.x.crimps;
 
 import org.apache.curator.framework.api.ErrorListenerPathAndBytesable;
 import org.apache.curator.framework.api.PathAndBytesable;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 
 public interface CrimpedBytes<T> extends
-    ErrorListenerPathAndBytesable<CompletableFuture<T>>,
-    PathAndBytesable<CompletableFuture<T>>
+    ErrorListenerPathAndBytesable<CompletionStage<T>>,
+    PathAndBytesable<CompletionStage<T>>
 {
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java
 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java
index 4d6e477..a7828c9 100644
--- 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java
+++ 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java
@@ -7,8 +7,7 @@ import org.apache.curator.framework.api.PathAndBytesable;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
+import java.util.concurrent.CompletionStage;
 import java.util.function.Function;
 
 class CrimpedBytesImpl<T> implements CrimpedBytes<T>
@@ -16,37 +15,35 @@ class CrimpedBytesImpl<T> implements CrimpedBytes<T>
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final BackgroundPathAndBytesable<T> builder;
     private final Function<CuratorEvent, CrimpResult<T>> resultFunction;
-    private final Executor executor;
     private UnhandledErrorListener unhandledErrorListener;
 
-    CrimpedBytesImpl(BackgroundPathAndBytesable<T> builder, Executor executor, 
Function<CuratorEvent, CrimpResult<T>> resultFunction)
+    CrimpedBytesImpl(BackgroundPathAndBytesable<T> builder, 
Function<CuratorEvent, CrimpResult<T>> resultFunction)
     {
-        this.executor = executor;
         this.builder = builder;
         this.resultFunction = resultFunction;
         unhandledErrorListener = null;
     }
 
     @Override
-    public PathAndBytesable<CompletableFuture<T>> 
withUnhandledErrorListener(UnhandledErrorListener listener)
+    public PathAndBytesable<CompletionStage<T>> 
withUnhandledErrorListener(UnhandledErrorListener listener)
     {
         unhandledErrorListener = listener;
         return this;
     }
 
     @Override
-    public CompletableFuture<T> forPath(String path) throws Exception
+    public CompletionStage<T> forPath(String path) throws Exception
     {
         return forPath(path, null);
     }
 
     @Override
-    public CompletableFuture<T> forPath(String path, byte[] data) throws 
Exception
+    public CompletionStage<T> forPath(String path, byte[] data) throws 
Exception
     {
         AsyncPathAndBytesable<T> supplier = new 
AsyncPathAndBytesable<T>(resultFunction);
         ErrorListenerPathAndBytesable<T> localBuilder = 
builder.inBackground(supplier);
         PathAndBytesable<T> finalLocalBuilder = (unhandledErrorListener != 
null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : 
localBuilder;
         finalLocalBuilder.forPath(path, data);
-        return CompletableFuture.supplyAsync(supplier, executor);
+        return supplier;
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java
index 52a47ed..268ab48 100644
--- 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java
+++ 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java
@@ -7,8 +7,7 @@ import org.apache.curator.framework.api.Pathable;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
+import java.util.concurrent.CompletionStage;
 import java.util.function.Function;
 
 class CrimpedImpl<T> implements Crimped<T>
@@ -16,31 +15,29 @@ class CrimpedImpl<T> implements Crimped<T>
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final BackgroundPathable<T> builder;
     private final Function<CuratorEvent, CrimpResult<T>> resultFunction;
-    private final Executor executor;
     private UnhandledErrorListener unhandledErrorListener;
 
-    CrimpedImpl(BackgroundPathable<T> builder, Executor executor, 
Function<CuratorEvent, CrimpResult<T>> resultFunction)
+    CrimpedImpl(BackgroundPathable<T> builder, Function<CuratorEvent, 
CrimpResult<T>> resultFunction)
     {
-        this.executor = executor;
         this.builder = builder;
         this.resultFunction = resultFunction;
         unhandledErrorListener = null;
     }
 
     @Override
-    public Pathable<CompletableFuture<T>> 
withUnhandledErrorListener(UnhandledErrorListener listener)
+    public Pathable<CompletionStage<T>> 
withUnhandledErrorListener(UnhandledErrorListener listener)
     {
         unhandledErrorListener = listener;
         return this;
     }
 
     @Override
-    public CompletableFuture<T> forPath(String path) throws Exception
+    public CompletionStage<T> forPath(String path) throws Exception
     {
         AsyncPathAndBytesable<T> supplier = new 
AsyncPathAndBytesable<T>(resultFunction);
         ErrorListenerPathable<T> localBuilder = builder.inBackground(supplier);
         Pathable<T> finalLocalBuilder = (unhandledErrorListener != null) ? 
localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder;
         finalLocalBuilder.forPath(path);
-        return CompletableFuture.supplyAsync(supplier, executor);
+        return supplier;
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java
----------------------------------------------------------------------
diff --git 
a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java 
b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java
index 1e42ad6..440aeca 100644
--- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java
+++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java
@@ -1,6 +1,5 @@
 package org.apache.curator.x.crimps;
 
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.curator.framework.api.BackgroundPathAndBytesable;
 import org.apache.curator.framework.api.BackgroundPathable;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -8,14 +7,10 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Executor;
 import java.util.function.Function;
 
 public class Crimps
 {
-    private final Executor executor;
-
     private static final Function<CuratorEvent, CrimpResult<String>> 
nameSupplier = makeSupplier(CuratorEvent::getName);
     private static final Function<CuratorEvent, CrimpResult<String>> 
pathSupplier = makeSupplier(CuratorEvent::getPath);
     private static final Function<CuratorEvent, CrimpResult<Void>> 
voidSupplier = makeSupplier(e -> null);
@@ -34,68 +29,57 @@ public class Crimps
         return new 
CrimpResult<>(KeeperException.create(KeeperException.Code.get(event.getResultCode())));
     }
 
-    public CrimpedBytes<String> 
nameInBackground(BackgroundPathAndBytesable<String> builder)
-    {
-        return build(builder, executor, nameSupplier);
-    }
-
-    public CrimpedBytes<String> 
pathInBackground(BackgroundPathAndBytesable<String> builder)
-    {
-        return build(builder, executor, pathSupplier);
-    }
-
-    public Crimped<Void> voidInBackground(BackgroundPathable<Void> builder)
+    public static CrimpedBytes<String> 
nameInBackground(BackgroundPathAndBytesable<String> builder)
     {
-        return build(builder, executor, voidSupplier);
+        return build(builder, nameSupplier);
     }
 
-    public Crimped<byte[]> dataInBackground(BackgroundPathable<byte[]> builder)
+    public static CrimpedBytes<String> 
pathInBackground(BackgroundPathAndBytesable<String> builder)
     {
-        return build(builder, executor, dataSupplier);
+        return build(builder, pathSupplier);
     }
 
-    public Crimped<List<String>> 
childrenInBackground(BackgroundPathable<List<String>> builder)
+    public static Crimped<Void> voidInBackground(BackgroundPathable<Void> 
builder)
     {
-        return build(builder, executor, childrenSupplier);
+        return build(builder, voidSupplier);
     }
 
-    public Crimped<Stat> statInBackground(BackgroundPathable<Stat> builder)
+    public static Crimped<byte[]> dataInBackground(BackgroundPathable<byte[]> 
builder)
     {
-        return build(builder, executor, statSupplier);
+        return build(builder, dataSupplier);
     }
 
-    public Crimped<List<ACL>> aclsInBackground(BackgroundPathable<List<ACL>> 
builder)
+    public static Crimped<List<String>> 
childrenInBackground(BackgroundPathable<List<String>> builder)
     {
-        return build(builder, executor, aclSupplier);
+        return build(builder, childrenSupplier);
     }
 
-    public CrimpedBytes<Stat> 
statBytesInBackground(BackgroundPathAndBytesable<Stat> builder)
+    public static Crimped<Stat> statInBackground(BackgroundPathable<Stat> 
builder)
     {
-        return build(builder, executor, statSupplier);
+        return build(builder, statSupplier);
     }
 
-    public static <T> CrimpedBytes<T> build(BackgroundPathAndBytesable<T> 
builder, Executor executor, Function<CuratorEvent, CrimpResult<T>> supplier)
+    public static Crimped<List<ACL>> 
aclsInBackground(BackgroundPathable<List<ACL>> builder)
     {
-        return new CrimpedBytesImpl<>(builder, executor, supplier);
+        return build(builder, aclSupplier);
     }
 
-    public static <T> Crimped<T> build(BackgroundPathable<T> builder, Executor 
executor, Function<CuratorEvent, CrimpResult<T>> supplier)
+    public static CrimpedBytes<Stat> 
statBytesInBackground(BackgroundPathAndBytesable<Stat> builder)
     {
-        return new CrimpedImpl<>(builder, executor, supplier);
+        return build(builder, statSupplier);
     }
 
-    public static Crimps newCrimps()
+    public static <T> CrimpedBytes<T> build(BackgroundPathAndBytesable<T> 
builder, Function<CuratorEvent, CrimpResult<T>> supplier)
     {
-        return new Crimps(MoreExecutors.sameThreadExecutor());
+        return new CrimpedBytesImpl<>(builder, supplier);
     }
 
-    public static Crimps newCrimps(Executor executor)
+    public static <T> Crimped<T> build(BackgroundPathable<T> builder, 
Function<CuratorEvent, CrimpResult<T>> supplier)
     {
-        return new Crimps(executor);
+        return new CrimpedImpl<>(builder, supplier);
     }
 
-    private Crimps(Executor executor)    // TODO
+    private Crimps()
     {
-        this.executor = Objects.requireNonNull(executor, "executor cannot be 
null");
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c53f59dc/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java
----------------------------------------------------------------------
diff --git 
a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java 
b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java
index 083b727..17c760b 100644
--- a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java
+++ b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java
@@ -9,29 +9,31 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 
 public class TestCrimps extends BaseClassForTests
 {
-    private final Crimps crimps = Crimps.newCrimps();
-
     @Test
     public void testCreateAndSet() throws Exception
     {
         try ( CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1)) )
         {
             client.start();
-            CompletableFuture<String> f = 
crimps.nameInBackground(client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).forPath("/a/b/c");
-            String path = f.get();
-            Assert.assertEquals(path, "/a/b/c0000000000");
+            CompletionStage<String> f = 
Crimps.nameInBackground(client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).forPath("/a/b/c");
+            complete(f.handle((path, e) -> {
+                Assert.assertEquals(path, "/a/b/c0000000000");
+                return null;
+            }));
 
-            f = crimps.nameInBackground(client.create()).forPath("/foo/bar");
+            f = Crimps.nameInBackground(client.create()).forPath("/foo/bar");
             assertException(f, KeeperException.Code.NONODE);
 
-            CompletableFuture<Stat> statFuture = 
crimps.statBytesInBackground(client.setData()).forPath(path, "hey".getBytes());
-            Stat stat = statFuture.get();
-            Assert.assertNotNull(stat);
+            CompletionStage<Stat> statFuture = 
Crimps.statBytesInBackground(client.setData()).forPath("/a/b/c0000000000", 
"hey".getBytes());
+            complete(statFuture.handle((stat, e) -> {
+                Assert.assertNotNull(stat);
+                return null;
+            }));
         }
     }
 
@@ -43,11 +45,14 @@ public class TestCrimps extends BaseClassForTests
             client.start();
             client.create().forPath("/test");
 
-            CompletableFuture<Void> f = 
crimps.voidInBackground(client.delete()).forPath("/test");
-            Void result = f.get();
-            Assert.assertEquals(result, null);
+            CompletionStage<Void> f = 
Crimps.voidInBackground(client.delete()).forPath("/test");
+            complete(f.handle((v, e) -> {
+                Assert.assertEquals(v, null);
+                Assert.assertEquals(e, null);
+                return null;
+            }));
 
-            f = crimps.voidInBackground(client.delete()).forPath("/test");
+            f = Crimps.voidInBackground(client.delete()).forPath("/test");
             assertException(f, KeeperException.Code.NONODE);
         }
     }
@@ -60,24 +65,45 @@ public class TestCrimps extends BaseClassForTests
             client.start();
             client.create().forPath("/test", "foo".getBytes());
 
-            CompletableFuture<byte[]> f = 
crimps.dataInBackground(client.getData()).forPath("/test");
-            byte[] data = f.get();
-            Assert.assertEquals(data, "foo".getBytes());
+            CompletionStage<byte[]> f = 
Crimps.dataInBackground(client.getData()).forPath("/test");
+            complete(f.handle((data, e) -> {
+                Assert.assertEquals(data, "foo".getBytes());
+                return null;
+            }));
         }
     }
 
-    public void assertException(CompletableFuture<?> f, KeeperException.Code 
code) throws InterruptedException
+    public void assertException(CompletionStage<?> f, KeeperException.Code 
code) throws Exception
+    {
+        complete(f.handle((value, e) -> {
+            if ( e == null )
+            {
+                Assert.fail(code + " expected");
+            }
+            KeeperException keeperException = CrimpException.unwrap(e);
+            Assert.assertNotNull(keeperException);
+            Assert.assertEquals(keeperException.code(), code);
+            return null;
+        }));
+    }
+
+    private void complete(CompletionStage<?> f) throws Exception
     {
         try
         {
-            f.get();
-            Assert.fail();
+            f.toCompletableFuture().get();
+        }
+        catch ( InterruptedException e )
+        {
+            Thread.currentThread().interrupt();
         }
         catch ( ExecutionException e )
         {
-            KeeperException keeperException = CrimpException.unwrap(e);
-            Assert.assertNotNull(keeperException);
-            Assert.assertEquals(keeperException.code(), code);
+            if ( e.getCause() instanceof AssertionError )
+            {
+                throw ((AssertionError)e.getCause());
+            }
+            throw e;
         }
     }
 }

Reply via email to