wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/10170c26 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/10170c26 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/10170c26 Branch: refs/heads/CURATOR-3.0 Commit: 10170c2691687cc9f266b6b19ab57b75b088233c Parents: 9b84ba3 Author: randgalt <randg...@apache.org> Authored: Fri Jan 6 12:49:17 2017 -0500 Committer: randgalt <randg...@apache.org> Committed: Fri Jan 6 12:49:17 2017 -0500 ---------------------------------------------------------------------- .../framework/imps/CreateBuilderImpl.java | 17 +- .../framework/imps/GetConfigBuilderImpl.java | 8 + .../framework/imps/ReconfigBuilderImpl.java | 13 +- .../imps/RemoveWatchesBuilderImpl.java | 12 ++ .../curator/framework/imps/SyncBuilderImpl.java | 6 + .../curator/x/async/AsyncCreateBuilder.java | 33 +++ .../curator/x/async/AsyncCuratorFramework.java | 11 +- .../x/async/AsyncCuratorFrameworkDsl.java | 21 +- .../curator/x/async/AsyncEnsemblable.java | 29 +++ .../curator/x/async/AsyncGetConfigBuilder.java | 8 + .../curator/x/async/AsyncReconfigBuilder.java | 51 +++++ .../x/async/AsyncRemoveWatchesBuilder.java | 33 +++ .../curator/x/async/AsyncSyncBuilder.java | 6 + .../x/async/AsyncTransactionCheckBuilder.java | 15 ++ .../x/async/AsyncTransactionCreateBuilder.java | 17 ++ .../x/async/AsyncTransactionDeleteBuilder.java | 9 + .../curator/x/async/AsyncTransactionOp.java | 56 +++++ .../x/async/AsyncTransactionSetDataBuilder.java | 13 ++ .../apache/curator/x/async/CreateOption.java | 10 + .../curator/x/async/RemoveWatcherOption.java | 8 + .../x/async/WatchedAsyncCuratorFramework.java | 6 +- .../x/async/details/AsyncCreateBuilderImpl.java | 127 +++++++++++ .../details/AsyncCuratorFrameworkImpl.java | 51 +++-- .../details/AsyncGetConfigBuilderImpl.java | 42 ++++ .../async/details/AsyncReconfigBuilderImpl.java | 108 ++++++++++ .../details/AsyncRemoveWatchesBuilderImpl.java | 157 ++++++++++++++ .../x/async/details/AsyncTransactionOpImpl.java | 215 +++++++++++++++++++ 27 files changed, 1032 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index 457b4ee..bbb98ea 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -41,7 +41,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; -class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String> +public class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String> { private final CuratorFrameworkImpl client; private CreateMode createMode; @@ -76,6 +76,21 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt storingStat = null; } + public CreateBuilderImpl(CuratorFrameworkImpl client, CreateMode createMode, Backgrounding backgrounding, boolean createParentsIfNeeded, boolean createParentsAsContainers, boolean doProtected, boolean compress, boolean setDataIfExists, List<ACL> aclList, Stat storingStat) + { + this.client = client; + this.createMode = createMode; + this.backgrounding = backgrounding; + this.createParentsIfNeeded = createParentsIfNeeded; + this.createParentsAsContainers = createParentsAsContainers; + this.doProtected = doProtected; + this.compress = compress; + this.setDataIfExists = setDataIfExists; + protectedId = null; + this.acling = new ACLing(client.getAclProvider(), aclList); + this.storingStat = storingStat; + } + @Override public CreateBuilderMain orSetData() { http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java index 1ab9043..db2d6e4 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java @@ -45,6 +45,14 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati watching = new Watching(client); } + public GetConfigBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, Watcher watcher, Stat stat) + { + this.client = client; + this.backgrounding = backgrounding; + this.watching = new Watching(client, watcher); + this.stat = stat; + } + @Override public WatchBackgroundEnsembleable<byte[]> storingStatIn(Stat stat) { http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java index df00785..97be59a 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java @@ -47,7 +47,18 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation this.client = client; } - private byte[] forEnsemble() throws Exception + public ReconfigBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, Stat responseStat, long fromConfig, List<String> newMembers, List<String> joining, List<String> leaving) + { + this.client = client; + this.backgrounding = backgrounding; + this.responseStat = responseStat; + this.fromConfig = fromConfig; + this.newMembers = newMembers; + this.joining = joining; + this.leaving = leaving; + } + + public byte[] forEnsemble() throws Exception { if ( backgrounding.inBackground() ) { http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index 27a3c0f..e14deff 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -54,6 +54,18 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat this.backgrounding = new Backgrounding(); } + public RemoveWatchesBuilderImpl(CuratorFrameworkImpl client, Watcher watcher, CuratorWatcher curatorWatcher, WatcherType watcherType, boolean guaranteed, boolean local, boolean quietly, Backgrounding backgrounding) + { + this.client = client; + this.watcher = watcher; + this.curatorWatcher = curatorWatcher; + this.watcherType = watcherType; + this.guaranteed = guaranteed; + this.local = local; + this.quietly = quietly; + this.backgrounding = backgrounding; + } + void internalRemoval(Watcher watcher, String path) throws Exception { this.watcher = watcher; http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java index 542b834..3dec17c 100755 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java @@ -40,6 +40,12 @@ public class SyncBuilderImpl implements SyncBuilder, BackgroundOperation<String> this.client = client; } + public SyncBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding) + { + this.client = client; + this.backgrounding = backgrounding; + } + @Override public ErrorListenerPathable<Void> inBackground() { http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCreateBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCreateBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCreateBuilder.java new file mode 100644 index 0000000..7e28398 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCreateBuilder.java @@ -0,0 +1,33 @@ +package org.apache.curator.x.async; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import java.util.List; +import java.util.Set; + +public interface AsyncCreateBuilder extends + AsyncPathAndBytesable<AsyncStage<String>> +{ + /** + * Have the operation fill the provided stat object + * + * @param stat the stat to have filled in + * @return this + */ + AsyncPathable<AsyncStage<String>> storingStatIn(Stat stat); + + AsyncPathable<AsyncStage<String>> withMode(CreateMode createMode); + + AsyncPathable<AsyncStage<String>> withACL(List<ACL> aclList); + + AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options); + + AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, List<ACL> aclList); + + AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList); + + AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode); + + AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList, Stat stat); +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java index 2f8cdf1..604be98 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java @@ -10,18 +10,9 @@ import org.apache.curator.framework.api.transaction.TransactionOp; */ public interface AsyncCuratorFramework extends AsyncCuratorFrameworkDsl { - CuratorFramework getCuratorFramework(); + CuratorFramework unwrap(); WatchedAsyncCuratorFramework watched(); AsyncCuratorFrameworkDsl withUnhandledErrorListener(UnhandledErrorListener listener); - - /** - * Allocate an operation that can be used with {@link #transaction()}. - * NOTE: {@link CuratorOp} instances created by this builder are - * reusable. - * - * @return operation builder - */ - TransactionOp transactionOp(); // TODO - versions that don't throw } http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFrameworkDsl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFrameworkDsl.java index 48d1a58..b7e03d1 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFrameworkDsl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFrameworkDsl.java @@ -1,13 +1,11 @@ package org.apache.curator.x.async; -import org.apache.curator.framework.api.CreateBuilder; -import org.apache.curator.framework.api.DeleteBuilder; import org.apache.curator.framework.api.GetACLBuilder; import org.apache.curator.framework.api.ReconfigBuilder; import org.apache.curator.framework.api.RemoveWatchesBuilder; import org.apache.curator.framework.api.SetACLBuilder; -import org.apache.curator.framework.api.SetDataBuilder; import org.apache.curator.framework.api.SyncBuilder; +import org.apache.curator.framework.api.transaction.CuratorOp; /** * Zookeeper framework-style client @@ -19,7 +17,7 @@ public interface AsyncCuratorFrameworkDsl extends WatchedAsyncCuratorFramework * * @return builder object */ - CreateBuilder create(); + AsyncCreateBuilder create(); /** * Start a delete builder @@ -54,7 +52,7 @@ public interface AsyncCuratorFrameworkDsl extends WatchedAsyncCuratorFramework * * @return builder object */ - ReconfigBuilder reconfig(); + AsyncReconfigBuilder reconfig(); /** * Start a transaction builder @@ -64,16 +62,25 @@ public interface AsyncCuratorFrameworkDsl extends WatchedAsyncCuratorFramework AsyncMultiTransaction transaction(); /** + * Allocate an operation that can be used with {@link #transaction()}. + * NOTE: {@link CuratorOp} instances created by this builder are + * reusable. + * + * @return operation builder + */ + AsyncTransactionOp transactionOp(); + + /** * Start a sync builder. Note: sync is ALWAYS in the background even * if you don't use one of the background() methods * * @return builder object */ - SyncBuilder sync(); + AsyncSyncBuilder sync(); /** * Start a remove watches builder. * @return builder object */ - RemoveWatchesBuilder watches(); + AsyncRemoveWatchesBuilder watches(); } http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEnsemblable.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEnsemblable.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEnsemblable.java new file mode 100644 index 0000000..55aa918 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEnsemblable.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async; + +public interface AsyncEnsemblable<T> +{ + /** + * Commit the currently building operation + * + * @return operation result if any + */ + T forEnsemble(); +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncGetConfigBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncGetConfigBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncGetConfigBuilder.java new file mode 100644 index 0000000..19e5b6a --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncGetConfigBuilder.java @@ -0,0 +1,8 @@ +package org.apache.curator.x.async; + +import org.apache.zookeeper.data.Stat; + +public interface AsyncGetConfigBuilder extends AsyncEnsemblable<AsyncStage<byte[]>> +{ + AsyncEnsemblable<AsyncStage<byte[]>> storingStatIn(Stat stat); +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncReconfigBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncReconfigBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncReconfigBuilder.java new file mode 100644 index 0000000..75b898e --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncReconfigBuilder.java @@ -0,0 +1,51 @@ +package org.apache.curator.x.async; + +import org.apache.zookeeper.data.Stat; +import java.util.List; + +public interface AsyncReconfigBuilder +{ + /** + * Sets one or more members that are meant to be the ensemble. + * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] + * + * @param servers The servers joining. + * @return this + */ + AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers); + + AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving); + + /** + * Sets one or more members that are meant to be the ensemble. + * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] + * + * @param servers The servers joining. + * @return this + */ + AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, long fromConfig); + + AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, long fromConfig); + + /** + * Sets one or more members that are meant to be the ensemble. + * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] + * + * @param servers The servers joining. + * @return this + */ + AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, Stat stat); + + AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, Stat stat); + + /** + * Sets one or more members that are meant to be the ensemble. + * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] + * + * @param servers The servers joining. + * @return this + */ + AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, Stat stat, long fromConfig); + + AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, Stat stat, long fromConfig); +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncRemoveWatchesBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncRemoveWatchesBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncRemoveWatchesBuilder.java new file mode 100644 index 0000000..fe8efa1 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncRemoveWatchesBuilder.java @@ -0,0 +1,33 @@ +package org.apache.curator.x.async; + +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.zookeeper.Watcher; +import java.util.Set; + +public interface AsyncRemoveWatchesBuilder extends + AsyncPathable<AsyncStage<Void>> +{ + AsyncPathable<AsyncStage<Void>> removing(Watcher watcher); + + AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher); + + AsyncPathable<AsyncStage<Void>> removingAll(); + + AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Set<RemoveWatcherOption> options); + + AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Set<RemoveWatcherOption> options); + + AsyncPathable<AsyncStage<Void>> removingAll(Set<RemoveWatcherOption> options); + + AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options); + + AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options); + + AsyncPathable<AsyncStage<Void>> removingAll(Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options); + + AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Watcher.WatcherType watcherType); + + AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Watcher.WatcherType watcherType); + + AsyncPathable<AsyncStage<Void>> removingAll(Watcher.WatcherType watcherType); +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncSyncBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncSyncBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncSyncBuilder.java new file mode 100644 index 0000000..0289aa7 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncSyncBuilder.java @@ -0,0 +1,6 @@ +package org.apache.curator.x.async; + +public interface AsyncSyncBuilder extends + AsyncPathable<AsyncStage<Void>> +{ +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCheckBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCheckBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCheckBuilder.java new file mode 100644 index 0000000..032029d --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCheckBuilder.java @@ -0,0 +1,15 @@ +package org.apache.curator.x.async; + +import org.apache.curator.framework.api.transaction.CuratorOp; + +public interface AsyncTransactionCheckBuilder extends + AsyncPathable<CuratorOp> +{ + /** + * Use the given version (the default is -1) + * + * @param version version to use + * @return this + */ + AsyncPathable<CuratorOp> withVersion(int version); +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCreateBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCreateBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCreateBuilder.java new file mode 100644 index 0000000..a94a011 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionCreateBuilder.java @@ -0,0 +1,17 @@ +package org.apache.curator.x.async; + +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; +import java.util.List; + +public interface AsyncTransactionCreateBuilder extends AsyncPathAndBytesable<CuratorOp> +{ + AsyncPathable<CuratorOp> withMode(CreateMode createMode); + + AsyncPathable<CuratorOp> withACL(List<ACL> aclList); + + AsyncPathable<CuratorOp> compressed(); + + AsyncPathable<CuratorOp> withOptions(CreateMode createMode, List<ACL> aclList, boolean compressed); +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionDeleteBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionDeleteBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionDeleteBuilder.java new file mode 100644 index 0000000..984999e --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionDeleteBuilder.java @@ -0,0 +1,9 @@ +package org.apache.curator.x.async; + +import org.apache.curator.framework.api.transaction.CuratorOp; + +public interface AsyncTransactionDeleteBuilder extends + AsyncPathable<CuratorOp> +{ + AsyncPathable<CuratorOp> withVersion(int version); +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionOp.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionOp.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionOp.java new file mode 100644 index 0000000..2e2f886 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionOp.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async; + +import org.apache.curator.framework.CuratorFramework; + +/** + * Builds operations that can be committed as a transaction + * via {@link CuratorFramework#transaction()} + */ +public interface AsyncTransactionOp +{ + /** + * Start a create builder in the transaction + * + * @return builder object + */ + AsyncTransactionCreateBuilder create(); + + /** + * Start a delete builder in the transaction + * + * @return builder object + */ + AsyncTransactionDeleteBuilder delete(); + + /** + * Start a setData builder in the transaction + * + * @return builder object + */ + AsyncTransactionSetDataBuilder setData(); + + /** + * Start a check builder in the transaction + * + * @return builder object + */ + AsyncTransactionCheckBuilder check(); +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionSetDataBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionSetDataBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionSetDataBuilder.java new file mode 100644 index 0000000..dfb1b9a --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncTransactionSetDataBuilder.java @@ -0,0 +1,13 @@ +package org.apache.curator.x.async; + +import org.apache.curator.framework.api.transaction.CuratorOp; + +public interface AsyncTransactionSetDataBuilder extends + AsyncPathAndBytesable<CuratorOp> +{ + AsyncPathAndBytesable<CuratorOp> withVersion(int version); + + AsyncPathAndBytesable<CuratorOp> compressed(); + + AsyncPathAndBytesable<CuratorOp> withVersionCompressed(int version); +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/CreateOption.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/CreateOption.java b/curator-x-async/src/main/java/org/apache/curator/x/async/CreateOption.java new file mode 100644 index 0000000..5c7b741 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/CreateOption.java @@ -0,0 +1,10 @@ +package org.apache.curator.x.async; + +public enum CreateOption +{ + createParentsIfNeeded, + createParentsAsContainers, + doProtected, + compress, + setDataIfExists +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/RemoveWatcherOption.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/RemoveWatcherOption.java b/curator-x-async/src/main/java/org/apache/curator/x/async/RemoveWatcherOption.java new file mode 100644 index 0000000..e4e8688 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/RemoveWatcherOption.java @@ -0,0 +1,8 @@ +package org.apache.curator.x.async; + +public enum RemoveWatcherOption +{ + guaranteed, + local, + quietly +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/WatchedAsyncCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/WatchedAsyncCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/WatchedAsyncCuratorFramework.java index b40c623..62f03ed 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/WatchedAsyncCuratorFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/WatchedAsyncCuratorFramework.java @@ -1,9 +1,5 @@ package org.apache.curator.x.async; -import org.apache.curator.framework.api.ExistsBuilder; -import org.apache.curator.framework.api.GetConfigBuilder; -import org.apache.curator.framework.api.GetDataBuilder; - /** * Zookeeper framework-style client */ @@ -38,5 +34,5 @@ public interface WatchedAsyncCuratorFramework * * @return builder object */ - GetConfigBuilder getConfig(); + AsyncGetConfigBuilder getConfig(); } http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java new file mode 100644 index 0000000..921cc0d --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java @@ -0,0 +1,127 @@ +package org.apache.curator.x.async.details; + +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.framework.imps.CreateBuilderImpl; +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.x.async.AsyncCreateBuilder; +import org.apache.curator.x.async.AsyncPathable; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.CreateOption; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +import static org.apache.curator.x.async.details.BackgroundProcs.pathProc; +import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; + +class AsyncCreateBuilderImpl implements AsyncCreateBuilder +{ + private final CuratorFrameworkImpl client; + private final UnhandledErrorListener unhandledErrorListener; + private CreateMode createMode = CreateMode.PERSISTENT; + private List<ACL> aclList = null; + private Set<CreateOption> options = Collections.emptySet(); + private Stat stat = null; + + AsyncCreateBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener) + { + this.client = client; + this.unhandledErrorListener = unhandledErrorListener; + } + + @Override + public AsyncPathable<AsyncStage<String>> storingStatIn(Stat stat) + { + this.stat = stat; + return this; + } + + @Override + public AsyncPathable<AsyncStage<String>> withMode(CreateMode createMode) + { + this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); + return this; + } + + @Override + public AsyncPathable<AsyncStage<String>> withACL(List<ACL> aclList) + { + this.aclList = aclList; + return this; + } + + @Override + public AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options) + { + this.options = Objects.requireNonNull(options, "options cannot be null"); + return this; + } + + @Override + public AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, List<ACL> aclList) + { + this.options = Objects.requireNonNull(options, "options cannot be null"); + this.aclList = aclList; + return this; + } + + @Override + public AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList) + { + this.options = Objects.requireNonNull(options, "options cannot be null"); + this.aclList = aclList; + this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); + return this; + } + + @Override + public AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode) + { + this.options = Objects.requireNonNull(options, "options cannot be null"); + this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); + return this; + } + + @Override + public AsyncPathable<AsyncStage<String>> withOptions(Set<CreateOption> options, CreateMode createMode, List<ACL> aclList, Stat stat) + { + this.options = Objects.requireNonNull(options, "options cannot be null"); + this.aclList = aclList; + this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); + this.stat = stat; + return this; + } + + @Override + public AsyncStage<String> forPath(String path) + { + return internalForPath(path, null, false); + } + + @Override + public AsyncStage<String> forPath(String path, byte[] data) + { + return internalForPath(path, data, true); + } + + private AsyncStage<String> internalForPath(String path, byte[] data, boolean useData) + { + BuilderCommon<String> common = new BuilderCommon<>(unhandledErrorListener, false, pathProc); + CreateBuilderImpl builder = new CreateBuilderImpl(client, + createMode, + common.backgrounding, + options.contains(CreateOption.createParentsIfNeeded), + options.contains(CreateOption.createParentsAsContainers), + options.contains(CreateOption.doProtected), + options.contains(CreateOption.compress), + options.contains(CreateOption.setDataIfExists), + aclList, + stat + ); + return safeCall(common.internalCallback, () -> useData ? builder.forPath(path, data) : builder.forPath(path)); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java index 193a428..2bb922b 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java @@ -1,24 +1,19 @@ package org.apache.curator.x.async.details; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.*; +import org.apache.curator.framework.api.GetACLBuilder; +import org.apache.curator.framework.api.ReconfigBuilder; +import org.apache.curator.framework.api.RemoveWatchesBuilder; +import org.apache.curator.framework.api.SetACLBuilder; +import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; -import org.apache.curator.framework.api.transaction.TransactionOp; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.CuratorMultiTransactionImpl; -import org.apache.curator.x.async.AsyncCuratorFramework; -import org.apache.curator.x.async.AsyncCuratorFrameworkDsl; -import org.apache.curator.x.async.AsyncDeleteBuilder; -import org.apache.curator.x.async.AsyncExistsBuilder; -import org.apache.curator.x.async.AsyncGetChildrenBuilder; -import org.apache.curator.x.async.AsyncGetDataBuilder; -import org.apache.curator.x.async.AsyncMultiTransaction; -import org.apache.curator.x.async.AsyncSetDataBuilder; -import org.apache.curator.x.async.WatchedAsyncCuratorFramework; +import org.apache.curator.framework.imps.SyncBuilderImpl; +import org.apache.curator.x.async.*; import java.util.List; -import static org.apache.curator.x.async.details.BackgroundProcs.opResultsProc; -import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; +import static org.apache.curator.x.async.details.BackgroundProcs.*; public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework { @@ -34,9 +29,9 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework } @Override - public CreateBuilder create() + public AsyncCreateBuilder create() { - return null; + return new AsyncCreateBuilderImpl(client, unhandledErrorListener); } @Override @@ -64,9 +59,9 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework } @Override - public ReconfigBuilder reconfig() + public AsyncReconfigBuilder reconfig() { - return null; + return new AsyncReconfigBuilderImpl(client, unhandledErrorListener); } @Override @@ -80,19 +75,23 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework } @Override - public SyncBuilder sync() + public AsyncSyncBuilder sync() { - return null; + return path -> { + BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc); + SyncBuilderImpl builder = new SyncBuilderImpl(client, common.backgrounding); + return safeCall(common.internalCallback, () -> builder.forPath(path)); + }; } @Override - public RemoveWatchesBuilder watches() + public AsyncRemoveWatchesBuilder watches() { - return null; + return new AsyncRemoveWatchesBuilderImpl(client, unhandledErrorListener); } @Override - public CuratorFramework getCuratorFramework() + public CuratorFramework unwrap() { return client; } @@ -110,9 +109,9 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework } @Override - public TransactionOp transactionOp() + public AsyncTransactionOp transactionOp() { - return client.transactionOp(); + return new AsyncTransactionOpImpl(client); } @Override @@ -134,8 +133,8 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework } @Override - public GetConfigBuilder getConfig() + public AsyncGetConfigBuilder getConfig() { - return null; + return new AsyncGetConfigBuilderImpl(client, unhandledErrorListener, watched); } } http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java new file mode 100644 index 0000000..5673e29 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java @@ -0,0 +1,42 @@ +package org.apache.curator.x.async.details; + +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.framework.imps.GetConfigBuilderImpl; +import org.apache.curator.x.async.AsyncEnsemblable; +import org.apache.curator.x.async.AsyncGetConfigBuilder; +import org.apache.curator.x.async.AsyncStage; +import org.apache.zookeeper.data.Stat; + +import static org.apache.curator.x.async.details.BackgroundProcs.dataProc; +import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; + +class AsyncGetConfigBuilderImpl implements AsyncGetConfigBuilder +{ + private final CuratorFrameworkImpl client; + private final UnhandledErrorListener unhandledErrorListener; + private final boolean watched; + private Stat stat = null; + + AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched) + { + this.client = client; + this.unhandledErrorListener = unhandledErrorListener; + this.watched = watched; + } + + @Override + public AsyncEnsemblable<AsyncStage<byte[]>> storingStatIn(Stat stat) + { + this.stat = stat; + return this; + } + + @Override + public AsyncStage<byte[]> forEnsemble() + { + BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watched, dataProc); + GetConfigBuilderImpl builder = new GetConfigBuilderImpl(client, common.backgrounding, common.watcher, stat); + return safeCall(common.internalCallback, builder::forEnsemble); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java new file mode 100644 index 0000000..ce0c21e --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java @@ -0,0 +1,108 @@ +package org.apache.curator.x.async.details; + +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.framework.imps.ReconfigBuilderImpl; +import org.apache.curator.x.async.AsyncEnsemblable; +import org.apache.curator.x.async.AsyncReconfigBuilder; +import org.apache.curator.x.async.AsyncStage; +import org.apache.zookeeper.data.Stat; +import java.util.List; + +import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc; +import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; + +class AsyncReconfigBuilderImpl implements AsyncReconfigBuilder, AsyncEnsemblable<AsyncStage<Void>> +{ + private final CuratorFrameworkImpl client; + private final UnhandledErrorListener unhandledErrorListener; + private Stat stat = null; + private long fromConfig = -1; + private List<String> newMembers = null; + private List<String> joining = null; + private List<String> leaving = null; + + AsyncReconfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener) + { + this.client = client; + this.unhandledErrorListener = unhandledErrorListener; + } + + @Override + public AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers) + { + this.newMembers = servers; + return this; + } + + @Override + public AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving) + { + this.joining = joining; + this.leaving = leaving; + return this; + } + + @Override + public AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, Stat stat) + { + this.newMembers = servers; + this.stat = stat; + return this; + } + + @Override + public AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, Stat stat) + { + this.joining = joining; + this.leaving = leaving; + return this; + } + + @Override + public AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, Stat stat, long fromConfig) + { + this.newMembers = servers; + this.stat = stat; + this.fromConfig = fromConfig; + return this; + } + + @Override + public AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, Stat stat, long fromConfig) + { + this.joining = joining; + this.leaving = leaving; + this.stat = stat; + this.fromConfig = fromConfig; + return this; + } + + @Override + public AsyncEnsemblable<AsyncStage<Void>> withNewMembers(List<String> servers, long fromConfig) + { + this.newMembers = servers; + this.fromConfig = fromConfig; + return this; + } + + @Override + public AsyncEnsemblable<AsyncStage<Void>> withJoiningAndLeaving(List<String> joining, List<String> leaving, long fromConfig) + { + this.joining = joining; + this.leaving = leaving; + this.fromConfig = fromConfig; + return this; + } + + @Override + public AsyncStage<Void> forEnsemble() + { + BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc); + ReconfigBuilderImpl builder = new ReconfigBuilderImpl(client, common.backgrounding, stat, fromConfig, newMembers, joining, leaving); + return safeCall(common.internalCallback, () -> { + builder.forEnsemble(); + return null; + }); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java new file mode 100644 index 0000000..553cd68 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java @@ -0,0 +1,157 @@ +package org.apache.curator.x.async.details; + +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.framework.imps.RemoveWatchesBuilderImpl; +import org.apache.curator.x.async.AsyncPathable; +import org.apache.curator.x.async.AsyncRemoveWatchesBuilder; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.RemoveWatcherOption; +import org.apache.zookeeper.Watcher; +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc; +import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; + +class AsyncRemoveWatchesBuilderImpl implements AsyncRemoveWatchesBuilder +{ + private final CuratorFrameworkImpl client; + private final UnhandledErrorListener unhandledErrorListener; + private Watcher.WatcherType watcherType = Watcher.WatcherType.Any; + private Set<RemoveWatcherOption> options = Collections.emptySet(); + private Watcher watcher = null; + private CuratorWatcher curatorWatcher = null; + + AsyncRemoveWatchesBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener) + { + this.client = client; + this.unhandledErrorListener = unhandledErrorListener; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removing(Watcher watcher) + { + this.watcher = Objects.requireNonNull(watcher, "watcher cannot be null"); + this.curatorWatcher = null; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher) + { + this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null"); + this.watcher = null; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removingAll() + { + this.curatorWatcher = null; + this.watcher = null; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Set<RemoveWatcherOption> options) + { + this.watcher = Objects.requireNonNull(watcher, "watcher cannot be null"); + this.options = Objects.requireNonNull(options, "options cannot be null"); + this.curatorWatcher = null; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Set<RemoveWatcherOption> options) + { + this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null"); + this.options = Objects.requireNonNull(options, "options cannot be null"); + this.watcher = null; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removingAll(Set<RemoveWatcherOption> options) + { + this.options = Objects.requireNonNull(options, "options cannot be null"); + this.curatorWatcher = null; + this.watcher = null; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options) + { + this.watcher = Objects.requireNonNull(watcher, "watcher cannot be null"); + this.options = Objects.requireNonNull(options, "options cannot be null"); + this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null"); + this.curatorWatcher = null; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options) + { + this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null"); + this.options = Objects.requireNonNull(options, "options cannot be null"); + this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null"); + this.watcher = null; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removingAll(Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options) + { + this.options = Objects.requireNonNull(options, "options cannot be null"); + this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null"); + this.curatorWatcher = null; + this.watcher = null; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Watcher.WatcherType watcherType) + { + this.watcher = Objects.requireNonNull(watcher, "watcher cannot be null"); + this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null"); + this.curatorWatcher = null; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Watcher.WatcherType watcherType) + { + this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null"); + this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null"); + this.watcher = null; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> removingAll(Watcher.WatcherType watcherType) + { + this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null"); + this.curatorWatcher = null; + this.watcher = null; + return this; + } + + @Override + public AsyncStage<Void> forPath(String path) + { + BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc); + RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client, + watcher, + curatorWatcher, + watcherType, + options.contains(RemoveWatcherOption.guaranteed), + options.contains(RemoveWatcherOption.local), + options.contains(RemoveWatcherOption.guaranteed), + common.backgrounding + ); + return safeCall(common.internalCallback, () -> builder.forPath(path)); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/10170c26/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java new file mode 100644 index 0000000..02506c1 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java @@ -0,0 +1,215 @@ +package org.apache.curator.x.async.details; + +import org.apache.curator.framework.api.ACLCreateModePathAndBytesable; +import org.apache.curator.framework.api.PathAndBytesable; +import org.apache.curator.framework.api.VersionPathAndBytesable; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.TransactionCreateBuilder; +import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder; +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.x.async.AsyncPathAndBytesable; +import org.apache.curator.x.async.AsyncPathable; +import org.apache.curator.x.async.AsyncTransactionCheckBuilder; +import org.apache.curator.x.async.AsyncTransactionCreateBuilder; +import org.apache.curator.x.async.AsyncTransactionDeleteBuilder; +import org.apache.curator.x.async.AsyncTransactionOp; +import org.apache.curator.x.async.AsyncTransactionSetDataBuilder; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; +import java.util.List; +import java.util.Objects; + +class AsyncTransactionOpImpl implements AsyncTransactionOp +{ + private final CuratorFrameworkImpl client; + + AsyncTransactionOpImpl(CuratorFrameworkImpl client) + { + this.client = client; + } + + @Override + public AsyncTransactionCreateBuilder create() + { + return new AsyncTransactionCreateBuilder() + { + private List<ACL> aclList = null; + private CreateMode createMode = CreateMode.PERSISTENT; + private boolean compressed = false; + + @Override + public AsyncPathable<CuratorOp> withMode(CreateMode createMode) + { + this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); + return this; + } + + @Override + public AsyncPathable<CuratorOp> withACL(List<ACL> aclList) + { + this.aclList = aclList; + return this; + } + + @Override + public AsyncPathable<CuratorOp> compressed() + { + compressed = true; + return this; + } + + @Override + public AsyncPathable<CuratorOp> withOptions(CreateMode createMode, List<ACL> aclList, boolean compressed) + { + this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); + this.aclList = aclList; + this.compressed = compressed; + return this; + } + + @Override + public CuratorOp forPath(String path, byte[] data) + { + return internalForPath(path, data, true); + } + + @Override + public CuratorOp forPath(String path) + { + return internalForPath(path, null, false); + } + + private CuratorOp internalForPath(String path, byte[] data, boolean useData) + { + TransactionCreateBuilder<CuratorOp> builder1 = client.transactionOp().create(); + ACLCreateModePathAndBytesable<CuratorOp> builder2 = compressed ? builder1.compressed() : builder1; + PathAndBytesable<CuratorOp> builder3 = builder2.withACL(aclList); + try + { + return useData ? builder3.forPath(path, data) : builder3.forPath(path); + } + catch ( Exception e ) + { + throw new RuntimeException(e); // should never happen + } + } + }; + } + + @Override + public AsyncTransactionDeleteBuilder delete() + { + return new AsyncTransactionDeleteBuilder() + { + private int version = -1; + + @Override + public AsyncPathable<CuratorOp> withVersion(int version) + { + this.version = version; + return this; + } + + @Override + public CuratorOp forPath(String path) + { + try + { + return client.transactionOp().delete().withVersion(version).forPath(path); + } + catch ( Exception e ) + { + throw new RuntimeException(e); // should never happen + } + } + }; + } + + @Override + public AsyncTransactionSetDataBuilder setData() + { + return new AsyncTransactionSetDataBuilder() + { + private int version = -1; + private boolean compressed = false; + + @Override + public AsyncPathAndBytesable<CuratorOp> withVersion(int version) + { + this.version = version; + return this; + } + + @Override + public AsyncPathAndBytesable<CuratorOp> compressed() + { + compressed = true; + return this; + } + + @Override + public AsyncPathAndBytesable<CuratorOp> withVersionCompressed(int version) + { + this.version = version; + compressed = true; + return this; + } + + @Override + public CuratorOp forPath(String path, byte[] data) + { + return internalForPath(path, data, true); + } + + @Override + public CuratorOp forPath(String path) + { + return internalForPath(path, null, false); + } + + private CuratorOp internalForPath(String path, byte[] data, boolean useData) + { + TransactionSetDataBuilder<CuratorOp> builder1 = client.transactionOp().setData(); + VersionPathAndBytesable<CuratorOp> builder2 = compressed ? builder1.compressed() : builder1; + PathAndBytesable<CuratorOp> builder3 = builder2.withVersion(version); + try + { + return useData ? builder3.forPath(path, data) : builder3.forPath(path); + } + catch ( Exception e ) + { + throw new RuntimeException(e); // should never happen + } + } + }; + } + + @Override + public AsyncTransactionCheckBuilder check() + { + return new AsyncTransactionCheckBuilder() + { + private int version = -1; + + @Override + public AsyncPathable<CuratorOp> withVersion(int version) + { + this.version = version; + return this; + } + + @Override + public CuratorOp forPath(String path) + { + try + { + return client.transactionOp().check().withVersion(version).forPath(path); + } + catch ( Exception e ) + { + throw new RuntimeException(e); // should never happen + } + } + }; + } +}