watched version of getConfig
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3aa51d50 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3aa51d50 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3aa51d50 Branch: refs/heads/CURATOR-3.0 Commit: 3aa51d503afe2f5952686976681194418afe171f Parents: 81f0ab2 Author: randgalt <randg...@apache.org> Authored: Thu Jan 5 23:17:03 2017 -0500 Committer: randgalt <randg...@apache.org> Committed: Thu Jan 5 23:17:03 2017 -0500 ---------------------------------------------------------------------- .../curator/x/crimps/async/AsyncCrimps.java | 69 ++++++++-------- .../crimps/async/CrimpedBackgroundCallback.java | 6 +- .../crimps/async/CrimpedConfigEnsembleable.java | 9 +-- .../x/crimps/async/CrimpedEnsembleable.java | 4 +- .../x/crimps/async/CrimpedEnsembleableImpl.java | 2 +- .../async/CrimpedWatchedEnsembleable.java | 29 +++++++ .../async/CrimpedWatchedEnsembleableImpl.java | 83 ++++++++++++++++++++ .../curator/x/crimps/async/TestCrimps.java | 9 +-- 8 files changed, 155 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java index 639deca..bc3a2c3 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/AsyncCrimps.java @@ -22,7 +22,6 @@ import org.apache.curator.framework.api.*; import org.apache.curator.framework.api.transaction.CuratorMultiTransactionMain; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import java.util.List; @@ -98,28 +97,28 @@ public class AsyncCrimps return build(builder, dataProc); } + public CrimpedPathable<Crimped<byte[]>> dataWatched(Watchable<BackgroundPathable<byte[]>> builder) + { + return build(builder, dataProc); + } + public CrimpedPathable<CompletionStage<List<String>>> children(BackgroundPathable<List<String>> builder) { return build(builder, childrenProc); } - public CrimpedPathable<CompletionStage<Stat>> stat(BackgroundPathable<Stat> builder) + public CrimpedPathable<Crimped<List<String>>> childrenWatched(Watchable<BackgroundPathable<List<String>>> builder) { - return build(builder, statProc); + return build(builder, childrenProc); } - public CrimpedPathable<CompletionStage<Stat>> safeStat(BackgroundPathable<Stat> builder) + public CrimpedPathable<CompletionStage<Stat>> stat(BackgroundPathable<Stat> builder) { return build(builder, safeStatProc); } public CrimpedPathable<Crimped<Stat>> statWatched(Watchable<BackgroundPathable<Stat>> builder) { - return build(builder, statProc); - } - - public CrimpedPathable<Crimped<Stat>> safeStatWatched(Watchable<BackgroundPathable<Stat>> builder) - { return build(builder, safeStatProc); } @@ -133,26 +132,23 @@ public class AsyncCrimps return build(builder, statProc); } - public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerEnsembleable<byte[]>> builder) + public CrimpedWatchedEnsembleable ensembleWatched(Watchable<BackgroundEnsembleable<byte[]>> builder) { - CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc); - - Ensembleable<byte[]> main; - if ( unhandledErrorListener != null ) - { - main = builder.inBackground(callback).withUnhandledErrorListener(unhandledErrorListener); - } - else - { - main = builder.inBackground(callback); - } + CrimpedWatcher crimpedWatcher = new CrimpedWatcher(); + CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, crimpedWatcher); + BackgroundEnsembleable<byte[]> localBuilder = builder.usingWatcher(crimpedWatcher); + return new CrimpedWatchedEnsembleableImpl(toFinalBuilder(callback, localBuilder), callback); + } - return new CrimpedEnsembleableImpl(main, callback); + public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerEnsembleable<byte[]>> builder) + { + CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, null); + return new CrimpedEnsembleableImpl(toFinalBuilder(callback, builder), callback); } public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerReconfigBuilderMain> builder, List<String> newMembers) { - CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc); + CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, null); ReconfigBuilderMain main; if ( unhandledErrorListener != null ) @@ -169,7 +165,7 @@ public class AsyncCrimps public CrimpedEnsembleable ensemble(Backgroundable<ErrorListenerReconfigBuilderMain> builder, List<String> joining, List<String> leaving) { - CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc); + CrimpedBackgroundCallback<byte[]> callback = new CrimpedBackgroundCallback<>(dataProc, null); ReconfigBuilderMain main; if ( unhandledErrorListener != null ) @@ -204,7 +200,7 @@ public class AsyncCrimps public CrimpedMultiTransaction opResults(Backgroundable<ErrorListenerMultiTransactionMain> builder) { - CrimpedBackgroundCallback<List<CuratorTransactionResult>> callback = new CrimpedBackgroundCallback<>(opResultsProc); + CrimpedBackgroundCallback<List<CuratorTransactionResult>> callback = new CrimpedBackgroundCallback<>(opResultsProc, null); ErrorListenerMultiTransactionMain main = builder.inBackground(callback); CuratorMultiTransactionMain finalBuilder = (unhandledErrorListener != null) ? main.withUnhandledErrorListener(unhandledErrorListener) : main; return ops -> { @@ -222,7 +218,7 @@ public class AsyncCrimps public <T> CrimpledPathAndBytesable<CompletionStage<T>> build(BackgroundPathAndBytesable<T> builder, BackgroundProc<T> backgroundProc) { - CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc); + CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc, null); ErrorListenerPathAndBytesable<T> localBuilder = builder.inBackground(callback); PathAndBytesable<T> finalLocalBuilder = (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder; return new CrimpledPathAndBytesableImpl<>(finalLocalBuilder, callback, null); @@ -231,28 +227,27 @@ public class AsyncCrimps public <T> CrimpedPathable<Crimped<T>> build(Watchable<BackgroundPathable<T>> builder, BackgroundProc<T> backgroundProc) { CrimpedWatcher crimpedWatcher = new CrimpedWatcher(); - CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc) - { - @Override - public CompletionStage<WatchedEvent> event() - { - return crimpedWatcher; - } - }; + CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<>(backgroundProc, crimpedWatcher); Pathable<T> finalLocalBuilder = toFinalBuilder(callback, builder.usingWatcher(crimpedWatcher)); return new CrimpledPathAndBytesableImpl<T, Crimped<T>>(finalLocalBuilder, callback, crimpedWatcher); } public <T> CrimpedPathable<CompletionStage<T>> build(BackgroundPathable<T> builder, BackgroundProc<T> backgroundProc) { - CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc); + CrimpedBackgroundCallback<T> callback = new CrimpedBackgroundCallback<T>(backgroundProc, null); Pathable<T> finalLocalBuilder = toFinalBuilder(callback, builder); return new CrimpledPathAndBytesableImpl<>(finalLocalBuilder, callback, null); } - private <T> Pathable<T> toFinalBuilder(CrimpedBackgroundCallback<T> callback, BackgroundPathable<T> backgroundPathable) + private Ensembleable<byte[]> toFinalBuilder(CrimpedBackgroundCallback<byte[]> callback, Backgroundable<ErrorListenerEnsembleable<byte[]>> builder) + { + ErrorListenerEnsembleable<byte[]> localBuilder = builder.inBackground(callback); + return (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder; + } + + private <T> Pathable<T> toFinalBuilder(CrimpedBackgroundCallback<T> callback, BackgroundPathable<T> builder) { - ErrorListenerPathable<T> localBuilder = backgroundPathable.inBackground(callback); + ErrorListenerPathable<T> localBuilder = builder.inBackground(callback); return (unhandledErrorListener != null) ? localBuilder.withUnhandledErrorListener(unhandledErrorListener) : localBuilder; } http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java index b3c20d2..9d9a6fe 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedBackgroundCallback.java @@ -28,16 +28,18 @@ import java.util.concurrent.CompletionStage; class CrimpedBackgroundCallback<T> extends CompletableFuture<T> implements BackgroundCallback, Crimped<T> { private final BackgroundProc<T> resultFunction; + private final CrimpedWatcher watcher; - CrimpedBackgroundCallback(BackgroundProc<T> resultFunction) + CrimpedBackgroundCallback(BackgroundProc<T> resultFunction, CrimpedWatcher watcher) { this.resultFunction = resultFunction; + this.watcher = watcher; } @Override public CompletionStage<WatchedEvent> event() { - return null; + return watcher; } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java index 10c0c35..6d8811f 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedConfigEnsembleable.java @@ -18,16 +18,13 @@ */ package org.apache.curator.x.crimps.async; -import org.apache.curator.framework.api.Ensembleable; -import java.util.concurrent.CompletionStage; - -public interface CrimpedConfigEnsembleable extends - CrimpledEnsembleable<CompletionStage<byte[]>> +public interface CrimpedConfigEnsembleable<T> extends + CrimpledEnsembleable<T> { /** * Sets the configuration version to use. * @param config The version of the configuration. * @return this */ - CrimpledEnsembleable<CompletionStage<byte[]>> fromConfig(long config); + CrimpledEnsembleable<T> fromConfig(long config); } http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java index c8166e7..7824fb6 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleable.java @@ -22,8 +22,8 @@ import org.apache.curator.framework.api.Statable; import java.util.concurrent.CompletionStage; public interface CrimpedEnsembleable extends - CrimpedConfigEnsembleable, - Statable<CrimpedConfigEnsembleable>, + CrimpedConfigEnsembleable<CompletionStage<byte[]>>, + Statable<CrimpedConfigEnsembleable<CompletionStage<byte[]>>>, CrimpledEnsembleable<CompletionStage<byte[]>> { } http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java index d94d242..b5b7a43 100644 --- a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedEnsembleableImpl.java @@ -62,7 +62,7 @@ class CrimpedEnsembleableImpl implements CrimpedEnsembleable } @Override - public CrimpedConfigEnsembleable storingStatIn(Stat stat) + public CrimpedConfigEnsembleable<CompletionStage<byte[]>> storingStatIn(Stat stat) { ensembleable = configureEnsembleable = configBuilder.storingStatIn(stat); return this; http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java new file mode 100644 index 0000000..be0cc58 --- /dev/null +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleable.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.crimps.async; + +import org.apache.curator.framework.api.Statable; +import java.util.concurrent.CompletionStage; + +public interface CrimpedWatchedEnsembleable extends + CrimpedConfigEnsembleable<Crimped<byte[]>>, + Statable<CrimpedConfigEnsembleable<Crimped<byte[]>>>, + CrimpledEnsembleable<Crimped<byte[]>> +{ +} http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java new file mode 100644 index 0000000..78c7304 --- /dev/null +++ b/curator-x-crimps/src/main/java/org/apache/curator/x/crimps/async/CrimpedWatchedEnsembleableImpl.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.crimps.async; + +import org.apache.curator.framework.api.ConfigureEnsembleable; +import org.apache.curator.framework.api.Ensembleable; +import org.apache.curator.framework.api.Statable; +import org.apache.zookeeper.data.Stat; + +class CrimpedWatchedEnsembleableImpl implements CrimpedWatchedEnsembleable +{ + private final CrimpedBackgroundCallback<byte[]> callback; + private final Statable<ConfigureEnsembleable> configBuilder; + private Ensembleable<byte[]> ensembleable; + private ConfigureEnsembleable configureEnsembleable; + + CrimpedWatchedEnsembleableImpl(Statable<ConfigureEnsembleable> configBuilder, CrimpedBackgroundCallback<byte[]> callback) + { + this.configBuilder = configBuilder; + this.callback = callback; + configureEnsembleable = configBuilder.storingStatIn(new Stat()); + ensembleable = configureEnsembleable; + } + + CrimpedWatchedEnsembleableImpl(Ensembleable<byte[]> ensembleable, CrimpedBackgroundCallback<byte[]> callback) + { + this.ensembleable = ensembleable; + this.configBuilder = null; + this.callback = callback; + configureEnsembleable = null; + } + + @Override + public Crimped<byte[]> forEnsemble() + { + try + { + ensembleable.forEnsemble(); + } + catch ( Exception e ) + { + callback.completeExceptionally(e); + } + return callback; + } + + @Override + public CrimpedConfigEnsembleable<Crimped<byte[]>> storingStatIn(Stat stat) + { + ensembleable = configureEnsembleable = configBuilder.storingStatIn(stat); + return this; + } + + @Override + public CrimpledEnsembleable<Crimped<byte[]>> fromConfig(long config) + { + try + { + ensembleable = configureEnsembleable.fromConfig(config); + } + catch ( Exception e ) + { + callback.completeExceptionally(e); + } + return this; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/3aa51d50/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java ---------------------------------------------------------------------- diff --git a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java index d5b7a6d..62065c3 100644 --- a/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java +++ b/curator-x-crimps/src/test/java/org/apache/curator/x/crimps/async/TestCrimps.java @@ -108,20 +108,13 @@ public class TestCrimps extends BaseClassForTests { client.start(); - CompletionStage<Stat> f = async.safeStat(client.checkExists()).forPath("/test"); + CompletionStage<Stat> f = async.stat(client.checkExists()).forPath("/test"); complete(f.handle((stat, e) -> { Assert.assertNull(e); Assert.assertNull(stat); return null; })); - f = async.stat(client.checkExists()).forPath("/test"); - complete(f.handle((stat, e) -> { - Assert.assertNotNull(e); - Assert.assertNull(stat); - return null; - })); - async.path(client.create()).forPath("/test").toCompletableFuture().get(); f = async.stat(client.checkExists()).forPath("/test"); complete(f.handle((stat, e) -> {