added support for transactions
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7689f33e Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7689f33e Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7689f33e Branch: refs/heads/CURATOR-3.0 Commit: 7689f33eab9e75fc5f443043bb7dd3675c1530dc Parents: 3d0fdba Author: randgalt <randg...@apache.org> Authored: Thu Jan 5 16:32:58 2017 -0500 Committer: randgalt <randg...@apache.org> Committed: Thu Jan 5 16:32:58 2017 -0500 ---------------------------------------------------------------------- .../curator/x/crimps/async/AsyncCrimps.java | 52 +++++++++++++------- .../x/crimps/async/CrimpedMultiTransaction.java | 38 ++++++++++++++ .../curator/x/crimps/async/TestCrimps.java | 26 ++++++++++ 3 files changed, 97 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/7689f33e/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java index d4c1545..d9324a3 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java @@ -19,6 +19,8 @@ package org.apache.curator.x.crimps.async; import org.apache.curator.framework.api.*; +import org.apache.curator.framework.api.transaction.CuratorMultiTransactionMain; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; @@ -28,17 +30,18 @@ import java.util.function.Function; public class AsyncCrimps { - public static final BackgroundProc<String> nameSupplier = makeSupplier(CuratorEvent::getName); - public static final BackgroundProc<String> pathSupplier = makeSupplier(CuratorEvent::getPath); - public static final BackgroundProc<Void> voidSupplier = makeSupplier(e -> null); - public static final BackgroundProc<byte[]> dataSupplier = makeSupplier(CuratorEvent::getData); - public static final BackgroundProc<Stat> statSupplier = makeSupplier(CuratorEvent::getStat); - public static final BackgroundProc<List<String>> childrenSupplier = makeSupplier(CuratorEvent::getChildren); - public static final BackgroundProc<List<ACL>> aclSupplier = makeSupplier(CuratorEvent::getACLList); + public static final BackgroundProc<String> nameProc = makeProc(CuratorEvent::getName); + public static final BackgroundProc<String> pathProc = makeProc(CuratorEvent::getPath); + public static final BackgroundProc<Void> ignoredProc = makeProc(e -> null); + public static final BackgroundProc<byte[]> dataProc = makeProc(CuratorEvent::getData); + public static final BackgroundProc<Stat> statProc = makeProc(CuratorEvent::getStat); + public static final BackgroundProc<List<String>> childrenProc = makeProc(CuratorEvent::getChildren); + public static final BackgroundProc<List<ACL>> aclProc = makeProc(CuratorEvent::getACLList); + public static final BackgroundProc<List<CuratorTransactionResult>> opResultsProc = makeProc(CuratorEvent::getOpResults); private final UnhandledErrorListener unhandledErrorListener; - public static <T> BackgroundProc<T> makeSupplier(Function<CuratorEvent, T> proc) + public static <T> BackgroundProc<T> makeProc(Function<CuratorEvent, T> proc) { return (event, future) -> { if ( event.getResultCode() == 0 ) @@ -65,47 +68,47 @@ public class AsyncCrimps public CrimpedPathAndBytesable<String> name(BackgroundPathAndBytesable<String> builder) { - return build(builder, nameSupplier); + return build(builder, nameProc); } public CrimpedPathAndBytesable<String> path(BackgroundPathAndBytesable<String> builder) { - return build(builder, pathSupplier); + return build(builder, pathProc); } public CrimpedPathable<Void> ignored(BackgroundPathable<Void> builder) { - return build(builder, voidSupplier); + return build(builder, ignoredProc); } public CrimpedPathable<byte[]> data(BackgroundPathable<byte[]> builder) { - return build(builder, dataSupplier); + return build(builder, dataProc); } public CrimpedPathable<List<String>> children(BackgroundPathable<List<String>> builder) { - return build(builder, childrenSupplier); + return build(builder, childrenProc); } public CrimpedPathable<Stat> stat(BackgroundPathable<Stat> builder) { - return build(builder, statSupplier); + return build(builder, statProc); } public CrimpedPathable<List<ACL>> acls(BackgroundPathable<List<ACL>> builder) { - return build(builder, aclSupplier); + return build(builder, aclProc); } public CrimpedPathAndBytesable<Stat> statBytes(BackgroundPathAndBytesable<Stat> builder) { - return build(builder, statSupplier); + return build(builder, statProc); } public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerEnsembleable<byte[]>> builder) { - CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataSupplier); + CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc); Ensembleable<byte[]> main; if ( unhandledErrorListener != null ) @@ -122,7 +125,7 @@ public class AsyncCrimps public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerReconfigBuilderMain> builder, List<String> newMembers) { - CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataSupplier); + CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc); ReconfigBuilderMain main; if ( unhandledErrorListener != null ) @@ -139,7 +142,7 @@ public class AsyncCrimps public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerReconfigBuilderMain> builder, List<String> joining, List<String> leaving) { - CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataSupplier); + CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc); ReconfigBuilderMain main; if ( unhandledErrorListener != null ) @@ -172,6 +175,17 @@ public class AsyncCrimps return new CrimpedEnsembleableImpl(configBuilder, callback); } + public CrimpedMultiTransaction opResults(Backgroundable<ErrorListenerMultiTransactionMain> builder) + { + CrimpedBackgroundCallback<List<CuratorTransactionResult>> callback = new CrimpedBackgroundCallback<>(opResultsProc); + ErrorListenerMultiTransactionMain main = builder.inBackground(callback); + CuratorMultiTransactionMain finalBuilder = (unhandledErrorListener != null) ? main.withUnhandledErrorListener(unhandledErrorListener) : main; + return ops -> { + finalBuilder.forOperations(ops); + return callback; + }; + } + public <T> CrimpedPathAndBytesable<T> build(BackgroundPathAndBytesable<T> builder, BackgroundProc<T> backgroundProc) { CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc); http://git-wip-us.apache.org/repos/asf/curator/blob/7689f33e/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedMultiTransaction.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedMultiTransaction.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedMultiTransaction.java new file mode 100644 index 0000000..b010624 --- /dev/null +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedMultiTransaction.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.crimps.async; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import java.util.List; +import java.util.concurrent.CompletionStage; + +public interface CrimpedMultiTransaction +{ + /** + * Commit the given operations as a single transaction. Create the + * operation instances via {@link CuratorFramework#transactionOp()} + * + * @param operations operations that make up the transaction. + * @return result details for foreground operations or <code>null</code> for background operations + * @throws Exception errors + */ + CompletionStage<List<CuratorTransactionResult>> forOperations(List<CuratorOp> operations) throws Exception; +} http://git-wip-us.apache.org/repos/asf/curator/blob/7689f33e/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java index 2a0ac00..926649a 100644 --- a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java +++ b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java @@ -20,6 +20,9 @@ package org.apache.curator.x.crimps.async; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.framework.api.transaction.OperationType; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.x.crimps.Crimps; @@ -29,6 +32,7 @@ import org.apache.zookeeper.data.Stat; import org.testng.Assert; import org.testng.annotations.Test; import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; @@ -122,6 +126,28 @@ public class TestCrimps extends BaseClassForTests } } + @Test + public void testMulti() throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + CuratorOp createA = client.transactionOp().create().forPath("/a"); + CuratorOp createB = client.transactionOp().create().forPath("/b"); + + CompletionStage<List<CuratorTransactionResult>> f = async.opResults(client.transaction()).forOperations(Arrays.asList(createA, createB)); + complete(f.handle((ops, e) -> { + Assert.assertNotNull(ops); + Assert.assertEquals(ops.get(0).getType(), OperationType.CREATE); + Assert.assertEquals(ops.get(0).getForPath(), "/a"); + Assert.assertEquals(ops.get(1).getType(), OperationType.CREATE); + Assert.assertEquals(ops.get(1).getForPath(), "/b"); + return null; + })); + } + } + public void assertException(CompletionStage<?> f, KeeperException.Code code) throws Exception { complete(f.handle((value, e) -> {