Repository: curator Updated Branches: refs/heads/CURATOR-3.0 836369284 -> 62f428c04
playing around with ideas for Java 8 wrappers Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f732a401 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f732a401 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f732a401 Branch: refs/heads/CURATOR-3.0 Commit: f732a4011423e9b27ec38bb4c3d8cf46238ca36f Parents: e683264 Author: randgalt <randg...@apache.org> Authored: Tue Jan 3 21:43:01 2017 -0500 Committer: randgalt <randg...@apache.org> Committed: Tue Jan 3 21:43:01 2017 -0500 ---------------------------------------------------------------------- curator-x-crimps/pom.xml | 42 ++++++++ .../curator/x/crimps/AsyncPathAndBytesable.java | 45 +++++++++ .../apache/curator/x/crimps/CrimpException.java | 38 +++++++ .../apache/curator/x/crimps/CrimpResult.java | 21 ++++ .../org/apache/curator/x/crimps/Crimped.java | 12 +++ .../apache/curator/x/crimps/CrimpedBytes.java | 11 ++ .../curator/x/crimps/CrimpedBytesImpl.java | 52 ++++++++++ .../apache/curator/x/crimps/CrimpedImpl.java | 46 +++++++++ .../org/apache/curator/x/crimps/Crimps.java | 101 +++++++++++++++++++ .../org/apache/curator/x/crimps/TestCrimps.java | 83 +++++++++++++++ pom.xml | 1 + 11 files changed, 452 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/f732a401/curator-x-crimps/pom.xml ---------------------------------------------------------------------- diff --git a/curator-x-crimps/pom.xml b/curator-x-crimps/pom.xml new file mode 100644 index 0000000..fd43b3f --- /dev/null +++ b/curator-x-crimps/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.curator</groupId> + <artifactId>apache-curator</artifactId> + <version>3.2.2-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>curator-x-crimps</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/curator/blob/f732a401/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java new file mode 100644 index 0000000..320f28e --- /dev/null +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/AsyncPathAndBytesable.java @@ -0,0 +1,45 @@ +package org.apache.curator.x.crimps; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.function.Function; +import java.util.function.Supplier; + +class AsyncPathAndBytesable<T> implements Supplier<T>, BackgroundCallback +{ + private final Function<CuratorEvent, CrimpResult<T>> resultFunction; + private final BlockingQueue<CrimpResult<T>> queue = new ArrayBlockingQueue<>(1); + + AsyncPathAndBytesable(Function<CuratorEvent, CrimpResult<T>> resultFunction) + { + this.resultFunction = resultFunction; + } + + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + queue.offer(resultFunction.apply(event)); + } + + @Override + public T get() + { + try + { + CrimpResult<T> result = queue.take(); + if ( result.exception != null ) + { + throw new CrimpException(result.exception); + } + return result.value; + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + throw new CrimpException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f732a401/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpException.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpException.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpException.java new file mode 100644 index 0000000..e36b15e --- /dev/null +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpException.java @@ -0,0 +1,38 @@ +package org.apache.curator.x.crimps; + +import org.apache.zookeeper.KeeperException; + +public class CrimpException extends RuntimeException +{ + private final KeeperException keeperException; + + public static KeeperException unwrap(Throwable e) + { + while ( e != null ) + { + if ( e instanceof KeeperException ) + { + return (KeeperException)e; + } + e = e.getCause(); + } + return null; + } + + public CrimpException(KeeperException keeperException) + { + super(keeperException); + this.keeperException = keeperException; + } + + public CrimpException(Throwable cause) + { + super(cause); + keeperException = null; + } + + public KeeperException getKeeperException() + { + return keeperException; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f732a401/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java new file mode 100644 index 0000000..602cfe1 --- /dev/null +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpResult.java @@ -0,0 +1,21 @@ +package org.apache.curator.x.crimps; + +import org.apache.zookeeper.KeeperException; + +class CrimpResult<T> +{ + final T value; + final KeeperException exception; + + CrimpResult(T value) + { + this.value = value; + this.exception = null; + } + + public CrimpResult(KeeperException exception) + { + this.value = null; + this.exception = exception; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f732a401/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java new file mode 100644 index 0000000..862ca4b --- /dev/null +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimped.java @@ -0,0 +1,12 @@ +package org.apache.curator.x.crimps; + +import org.apache.curator.framework.api.Backgroundable; +import org.apache.curator.framework.api.ErrorListenerPathable; +import org.apache.curator.framework.api.Pathable; +import java.util.concurrent.CompletableFuture; + +public interface Crimped<T> extends + ErrorListenerPathable<CompletableFuture<T>>, + Pathable<CompletableFuture<T>> +{ +} http://git-wip-us.apache.org/repos/asf/curator/blob/f732a401/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java new file mode 100644 index 0000000..3c5b39e --- /dev/null +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytes.java @@ -0,0 +1,11 @@ +package org.apache.curator.x.crimps; + +import org.apache.curator.framework.api.ErrorListenerPathAndBytesable; +import org.apache.curator.framework.api.PathAndBytesable; +import java.util.concurrent.CompletableFuture; + +public interface CrimpedBytes<T> extends + ErrorListenerPathAndBytesable<CompletableFuture<T>>, + PathAndBytesable<CompletableFuture<T>> +{ +} http://git-wip-us.apache.org/repos/asf/curator/blob/f732a401/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java new file mode 100644 index 0000000..4d6e477 --- /dev/null +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedBytesImpl.java @@ -0,0 +1,52 @@ +package org.apache.curator.x.crimps; + +import org.apache.curator.framework.api.BackgroundPathAndBytesable; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.ErrorListenerPathAndBytesable; +import org.apache.curator.framework.api.PathAndBytesable; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Function; + +class CrimpedBytesImpl<T> implements CrimpedBytes<T> +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final BackgroundPathAndBytesable<T> builder; + private final Function<CuratorEvent, CrimpResult<T>> resultFunction; + private final Executor executor; + private UnhandledErrorListener unhandledErrorListener; + + CrimpedBytesImpl(BackgroundPathAndBytesable<T> builder, Executor executor, Function<CuratorEvent, CrimpResult<T>> resultFunction) + { + this.executor = executor; + this.builder = builder; + this.resultFunction = resultFunction; + unhandledErrorListener = null; + } + + @Override + public PathAndBytesable<CompletableFuture<T>> withUnhandledErrorListener(UnhandledErrorListener listener) + { + unhandledErrorListener = listener; + return this; + } + + @Override + public CompletableFuture<T> forPath(String path) throws Exception + { + return forPath(path, null); + } + + @Override + public CompletableFuture<T> forPath(String path, byte[] data) throws Exception + { + AsyncPathAndBytesable<T> supplier = new AsyncPathAndBytesable<T>(resultFunction); + ErrorListenerPathAndBytesable<T> localBuilder = builder.inBackground(supplier); + PathAndBytesable<T> finalLocalBuilder = (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder; + finalLocalBuilder.forPath(path, data); + return CompletableFuture.supplyAsync(supplier, executor); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f732a401/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java new file mode 100644 index 0000000..52a47ed --- /dev/null +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/CrimpedImpl.java @@ -0,0 +1,46 @@ +package org.apache.curator.x.crimps; + +import org.apache.curator.framework.api.BackgroundPathable; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.ErrorListenerPathable; +import org.apache.curator.framework.api.Pathable; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Function; + +class CrimpedImpl<T> implements Crimped<T> +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final BackgroundPathable<T> builder; + private final Function<CuratorEvent, CrimpResult<T>> resultFunction; + private final Executor executor; + private UnhandledErrorListener unhandledErrorListener; + + CrimpedImpl(BackgroundPathable<T> builder, Executor executor, Function<CuratorEvent, CrimpResult<T>> resultFunction) + { + this.executor = executor; + this.builder = builder; + this.resultFunction = resultFunction; + unhandledErrorListener = null; + } + + @Override + public Pathable<CompletableFuture<T>> withUnhandledErrorListener(UnhandledErrorListener listener) + { + unhandledErrorListener = listener; + return this; + } + + @Override + public CompletableFuture<T> forPath(String path) throws Exception + { + AsyncPathAndBytesable<T> supplier = new AsyncPathAndBytesable<T>(resultFunction); + ErrorListenerPathable<T> localBuilder = builder.inBackground(supplier); + Pathable<T> finalLocalBuilder = (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder; + finalLocalBuilder.forPath(path); + return CompletableFuture.supplyAsync(supplier, executor); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f732a401/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java new file mode 100644 index 0000000..52ff4a1 --- /dev/null +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/Crimps.java @@ -0,0 +1,101 @@ +package org.apache.curator.x.crimps; + +import org.apache.curator.framework.api.BackgroundPathAndBytesable; +import org.apache.curator.framework.api.BackgroundPathable; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Function; + +public class Crimps +{ + private final Executor executor; + + private static final Function<CuratorEvent, CrimpResult<String>> nameSupplier = makeSupplier(CuratorEvent::getName); + private static final Function<CuratorEvent, CrimpResult<String>> pathSupplier = makeSupplier(CuratorEvent::getPath); + private static final Function<CuratorEvent, CrimpResult<Void>> voidSupplier = makeSupplier(e -> null); + private static final Function<CuratorEvent, CrimpResult<byte[]>> dataSupplier = makeSupplier(CuratorEvent::getData); + private static final Function<CuratorEvent, CrimpResult<Stat>> statSupplier = makeSupplier(CuratorEvent::getStat); + private static final Function<CuratorEvent, CrimpResult<List<String>>> childrenSupplier = makeSupplier(CuratorEvent::getChildren); + private static final Function<CuratorEvent, CrimpResult<List<ACL>>> aclSupplier = makeSupplier(CuratorEvent::getACLList); + + private static <T> Function<CuratorEvent, CrimpResult<T>> makeSupplier(Function<CuratorEvent, T> proc) + { + return event -> (event.getResultCode() == 0) ? new CrimpResult<>(proc.apply(event)) : asException(event); + } + + private static <T> CrimpResult<T> asException(CuratorEvent event) + { + return new CrimpResult<>(KeeperException.create(KeeperException.Code.get(event.getResultCode()))); + } + + public CrimpedBytes<String> nameInBackground(BackgroundPathAndBytesable<String> builder) + { + return build(builder, executor, nameSupplier); + } + + public CrimpedBytes<String> pathInBackground(BackgroundPathAndBytesable<String> builder) + { + return build(builder, executor, pathSupplier); + } + + public Crimped<Void> voidInBackground(BackgroundPathable<Void> builder) + { + return build(builder, executor, voidSupplier); + } + + public Crimped<byte[]> dataInBackground(BackgroundPathable<byte[]> builder) + { + return build(builder, executor, dataSupplier); + } + + public Crimped<List<String>> childrenInBackground(BackgroundPathable<List<String>> builder) + { + return build(builder, executor, childrenSupplier); + } + + public Crimped<Stat> statInBackground(BackgroundPathable<Stat> builder) + { + return build(builder, executor, statSupplier); + } + + public Crimped<List<ACL>> aclsInBackground(BackgroundPathable<List<ACL>> builder) + { + return build(builder, executor, aclSupplier); + } + + public CrimpedBytes<Stat> statBytesInBackground(BackgroundPathAndBytesable<Stat> builder) + { + return build(builder, executor, statSupplier); + } + + public static <T> CrimpedBytes<T> build(BackgroundPathAndBytesable<T> builder, Executor executor, Function<CuratorEvent, CrimpResult<T>> supplier) + { + return new CrimpedBytesImpl<>(builder, executor, supplier); + } + + public static <T> Crimped<T> build(BackgroundPathable<T> builder, Executor executor, Function<CuratorEvent, CrimpResult<T>> supplier) + { + return new CrimpedImpl<>(builder, executor, supplier); + } + + public static Crimps newCrimps() + { + return new Crimps(ForkJoinPool.commonPool()); + } + + public static Crimps newCrimps(Executor executor) + { + return new Crimps(executor); + } + + private Crimps(Executor executor) // TODO + { + this.executor = Objects.requireNonNull(executor, "executor cannot be null"); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f732a401/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java new file mode 100644 index 0000000..083b727 --- /dev/null +++ b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/TestCrimps.java @@ -0,0 +1,83 @@ +package org.apache.curator.x.crimps; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class TestCrimps extends BaseClassForTests +{ + private final Crimps crimps = Crimps.newCrimps(); + + @Test + public void testCreateAndSet() throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + CompletableFuture<String> f = crimps.nameInBackground(client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).forPath("/a/b/c"); + String path = f.get(); + Assert.assertEquals(path, "/a/b/c0000000000"); + + f = crimps.nameInBackground(client.create()).forPath("/foo/bar"); + assertException(f, KeeperException.Code.NONODE); + + CompletableFuture<Stat> statFuture = crimps.statBytesInBackground(client.setData()).forPath(path, "hey".getBytes()); + Stat stat = statFuture.get(); + Assert.assertNotNull(stat); + } + } + + @Test + public void testDelete() throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + client.create().forPath("/test"); + + CompletableFuture<Void> f = crimps.voidInBackground(client.delete()).forPath("/test"); + Void result = f.get(); + Assert.assertEquals(result, null); + + f = crimps.voidInBackground(client.delete()).forPath("/test"); + assertException(f, KeeperException.Code.NONODE); + } + } + + @Test + public void testGetData() throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + client.create().forPath("/test", "foo".getBytes()); + + CompletableFuture<byte[]> f = crimps.dataInBackground(client.getData()).forPath("/test"); + byte[] data = f.get(); + Assert.assertEquals(data, "foo".getBytes()); + } + } + + public void assertException(CompletableFuture<?> f, KeeperException.Code code) throws InterruptedException + { + try + { + f.get(); + Assert.fail(); + } + catch ( ExecutionException e ) + { + KeeperException keeperException = CrimpException.unwrap(e); + Assert.assertNotNull(keeperException); + Assert.assertEquals(keeperException.code(), code); + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f732a401/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f327806..d56386f 100644 --- a/pom.xml +++ b/pom.xml @@ -286,6 +286,7 @@ <module>curator-x-discovery</module> <module>curator-x-discovery-server</module> <module>curator-x-rpc</module> + <module>curator-x-crimps</module> </modules> <dependencyManagement>