Needed a method to re-stage watchers that triggered only for connection problems.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2fa1a69a Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2fa1a69a Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2fa1a69a Branch: refs/heads/CURATOR-3.0 Commit: 2fa1a69afd9b1b9d4be6c756b643ca4d4ce1f810 Parents: b028098 Author: randgalt <randg...@apache.org> Authored: Fri Jan 6 17:22:02 2017 -0500 Committer: randgalt <randg...@apache.org> Committed: Fri Jan 6 17:22:02 2017 -0500 ---------------------------------------------------------------------- .../curator/x/async/AsyncCuratorFramework.java | 17 +++++- .../curator/x/async/AsyncEventException.java | 46 +++++++++++++++ .../org/apache/curator/x/async/WatchMode.java | 42 ++++++++++++++ .../x/async/details/AsyncCreateBuilderImpl.java | 2 +- .../details/AsyncCuratorFrameworkImpl.java | 32 +++++----- .../x/async/details/AsyncDeleteBuilderImpl.java | 2 +- .../x/async/details/AsyncExistsBuilderImpl.java | 9 +-- .../details/AsyncGetChildrenBuilderImpl.java | 9 +-- .../details/AsyncGetConfigBuilderImpl.java | 9 +-- .../async/details/AsyncGetDataBuilderImpl.java | 9 +-- .../async/details/AsyncReconfigBuilderImpl.java | 2 +- .../details/AsyncRemoveWatchesBuilderImpl.java | 2 +- .../x/async/details/AsyncSetACLBuilderImpl.java | 2 +- .../async/details/AsyncSetDataBuilderImpl.java | 2 +- .../curator/x/async/details/BuilderCommon.java | 10 +++- .../x/async/details/InternalCallback.java | 2 +- .../x/async/details/InternalWatcher.java | 61 ++++++++++++++------ .../curator/x/async/TestBasicOperations.java | 37 +++++++++++- 18 files changed, 238 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java index 6dc6f3e..9b29918 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java @@ -53,14 +53,29 @@ public interface AsyncCuratorFramework extends AsyncCuratorFrameworkDsl CuratorFramework unwrap(); /** + * <p> * Returns a facade that adds watching to any of the subsequently created builders. i.e. all - * operations on the WatchedAsyncCuratorFramework facade will have watchers set. + * operations on the WatchableAsyncCuratorFramework facade will have watchers set. Also, + * the {@link org.apache.curator.x.async.AsyncStage} returned from these builders will + * have a loaded staged watcher that is accessed from {@link org.apache.curator.x.async.AsyncStage#event()} + * </p> + * + * <p> + * {@link WatchMode#stateChangeAndSuccess} is used + * </p> * * @return watcher facade */ WatchableAsyncCuratorFramework watched(); /** + * Same as {@link #watched()} but allows specifying the watch mode + * + * @return watcher facade + */ + WatchableAsyncCuratorFramework watched(WatchMode mode); + + /** * Returns a facade that adds the given UnhandledErrorListener to all background operations * * @param listener lister to use http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java new file mode 100644 index 0000000..f863215 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import java.util.concurrent.CompletionStage; + +/** + * The exception type set for async watchers + */ +public abstract class AsyncEventException extends Exception +{ + /** + * Returns the error condition that temporarily triggered the watcher. NOTE: the watcher + * will most likely still be set. Use {@link #reset()} to stage on the successful trigger + * + * @return state + */ + public abstract Watcher.Event.KeeperState getKeeperState(); + + /** + * ZooKeeper temporarily triggers watchers when there is a connection event. However, the watcher + * stays set for the original operation. Use this method to reset with a new completion stage + * that will allow waiting for a successful trigger. + * + * @return new stage + */ + public abstract CompletionStage<WatchedEvent> reset(); +} http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java b/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java new file mode 100644 index 0000000..dbce8c1 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async; + +public enum WatchMode +{ + /** + * The {@link java.util.concurrent.CompletionStage}<org.apache.zookeeper.WatchedEvent> will only + * complete on successful trigger. i.e. connection issues are ignored + */ + successOnly, + + /** + * The {@link java.util.concurrent.CompletionStage}<org.apache.zookeeper.WatchedEvent> will only + * completeExceptionally. Successful trigger is ignored. Connection exceptions are + * of type: {@link org.apache.curator.x.async.AsyncEventException}. + */ + stateChangeOnly, + + /** + * The {@link java.util.concurrent.CompletionStage}<org.apache.zookeeper.WatchedEvent> will + * complete for both successful trigger and connection exceptions. Connection exceptions are + * of type: {@link org.apache.curator.x.async.AsyncEventException}. + */ + stateChangeAndSuccess +} http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java index ce5b31e..7723775 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java @@ -128,7 +128,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder private AsyncStage<String> internalForPath(String path, byte[] data, boolean useData) { - BuilderCommon<String> common = new BuilderCommon<>(unhandledErrorListener, false, nameProc); + BuilderCommon<String> common = new BuilderCommon<>(unhandledErrorListener, nameProc); CreateBuilderImpl builder = new CreateBuilderImpl(client, createMode, common.backgrounding, http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java index d502079..a6101f2 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java @@ -38,11 +38,11 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework { private final CuratorFrameworkImpl client; private final UnhandledErrorListener unhandledErrorListener; - private final boolean watched; + private final WatchMode watchMode; public AsyncCuratorFrameworkImpl(CuratorFramework client) { - this(reveal(client), null, false); + this(reveal(client), null, null); } private static CuratorFrameworkImpl reveal(CuratorFramework client) @@ -57,11 +57,11 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework } } - public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched) + public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode) { this.client = client; this.unhandledErrorListener = unhandledErrorListener; - this.watched = watched; + this.watchMode = watchMode; } @Override @@ -99,7 +99,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework @Override public AsyncStage<List<ACL>> forPath(String path) { - BuilderCommon<List<ACL>> common = new BuilderCommon<>(unhandledErrorListener, false, aclProc); + BuilderCommon<List<ACL>> common = new BuilderCommon<>(unhandledErrorListener, aclProc); GetACLBuilderImpl builder = new GetACLBuilderImpl(client, common.backgrounding, stat); return safeCall(common.internalCallback, () -> builder.forPath(path)); } @@ -122,7 +122,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework public AsyncMultiTransaction transaction() { return operations -> { - BuilderCommon<List<CuratorTransactionResult>> common = new BuilderCommon<>(unhandledErrorListener, false, opResultsProc); + BuilderCommon<List<CuratorTransactionResult>> common = new BuilderCommon<>(unhandledErrorListener, opResultsProc); CuratorMultiTransactionImpl builder = new CuratorMultiTransactionImpl(client, common.backgrounding); return safeCall(common.internalCallback, () -> builder.forOperations(operations)); }; @@ -132,7 +132,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework public AsyncSyncBuilder sync() { return path -> { - BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc); + BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc); SyncBuilderImpl builder = new SyncBuilderImpl(client, common.backgrounding); return safeCall(common.internalCallback, () -> builder.forPath(path)); }; @@ -153,13 +153,19 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework @Override public WatchableAsyncCuratorFramework watched() { - return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, true); + return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, WatchMode.stateChangeAndSuccess); + } + + @Override + public WatchableAsyncCuratorFramework watched(WatchMode mode) + { + return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, mode); } @Override public AsyncCuratorFrameworkDsl withUnhandledErrorListener(UnhandledErrorListener listener) { - return new AsyncCuratorFrameworkImpl(client, listener, watched); + return new AsyncCuratorFrameworkImpl(client, listener, watchMode); } @Override @@ -171,24 +177,24 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework @Override public AsyncExistsBuilder checkExists() { - return new AsyncExistsBuilderImpl(client, unhandledErrorListener, watched); + return new AsyncExistsBuilderImpl(client, unhandledErrorListener, watchMode); } @Override public AsyncGetDataBuilder getData() { - return new AsyncGetDataBuilderImpl(client, unhandledErrorListener, watched); + return new AsyncGetDataBuilderImpl(client, unhandledErrorListener, watchMode); } @Override public AsyncGetChildrenBuilder getChildren() { - return new AsyncGetChildrenBuilderImpl(client, unhandledErrorListener, watched); + return new AsyncGetChildrenBuilderImpl(client, unhandledErrorListener, watchMode); } @Override public AsyncGetConfigBuilder getConfig() { - return new AsyncGetConfigBuilderImpl(client, unhandledErrorListener, watched); + return new AsyncGetConfigBuilderImpl(client, unhandledErrorListener, watchMode); } } http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java index 54073b0..243ea44 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java @@ -69,7 +69,7 @@ class AsyncDeleteBuilderImpl implements AsyncDeleteBuilder @Override public AsyncStage<Void> forPath(String path) { - BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc); + BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc); DeleteBuilderImpl builder = new DeleteBuilderImpl(client, version, common.backgrounding, options.contains(DeleteOption.deletingChildrenIfNeeded), options.contains(DeleteOption.guaranteed), options.contains(DeleteOption.quietly)); return safeCall(common.internalCallback, () -> builder.forPath(path)); } http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java index c77a0aa..d672047 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java @@ -21,6 +21,7 @@ package org.apache.curator.x.async.details; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.ExistsBuilderImpl; +import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.api.AsyncExistsBuilder; import org.apache.curator.x.async.api.AsyncPathable; import org.apache.curator.x.async.AsyncStage; @@ -37,14 +38,14 @@ class AsyncExistsBuilderImpl implements AsyncExistsBuilder { private final CuratorFrameworkImpl client; private final UnhandledErrorListener unhandledErrorListener; - private final boolean watched; + private final WatchMode watchMode; private Set<ExistsOption> options = Collections.emptySet(); - AsyncExistsBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched) + AsyncExistsBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode) { this.client = client; this.unhandledErrorListener = unhandledErrorListener; - this.watched = watched; + this.watchMode = watchMode; } @Override @@ -57,7 +58,7 @@ class AsyncExistsBuilderImpl implements AsyncExistsBuilder @Override public AsyncStage<Stat> forPath(String path) { - BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, watched, safeStatProc); + BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, watchMode, safeStatProc); ExistsBuilderImpl builder = new ExistsBuilderImpl(client, common.backgrounding, common.watcher, options.contains(ExistsOption.createParentsIfNeeded), options.contains(ExistsOption.createParentsAsContainers)); return safeCall(common.internalCallback, () -> builder.forPath(path)); } http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java index b429c58..7258c6c 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java @@ -21,6 +21,7 @@ package org.apache.curator.x.async.details; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.GetChildrenBuilderImpl; +import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.api.AsyncGetChildrenBuilder; import org.apache.curator.x.async.api.AsyncPathable; import org.apache.curator.x.async.AsyncStage; @@ -34,20 +35,20 @@ class AsyncGetChildrenBuilderImpl implements AsyncGetChildrenBuilder { private final CuratorFrameworkImpl client; private final UnhandledErrorListener unhandledErrorListener; - private final boolean watched; + private final WatchMode watchMode; private Stat stat = null; - AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched) + AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode) { this.client = client; this.unhandledErrorListener = unhandledErrorListener; - this.watched = watched; + this.watchMode = watchMode; } @Override public AsyncStage<List<String>> forPath(String path) { - BuilderCommon<List<String>> common = new BuilderCommon<>(unhandledErrorListener, watched, childrenProc); + BuilderCommon<List<String>> common = new BuilderCommon<>(unhandledErrorListener, watchMode, childrenProc); GetChildrenBuilderImpl builder = new GetChildrenBuilderImpl(client, common.watcher, common.backgrounding, stat); return safeCall(common.internalCallback, () -> builder.forPath(path)); } http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java index 7ecb18a..273fba2 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java @@ -21,6 +21,7 @@ package org.apache.curator.x.async.details; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.GetConfigBuilderImpl; +import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.api.AsyncEnsemblable; import org.apache.curator.x.async.api.AsyncGetConfigBuilder; import org.apache.curator.x.async.AsyncStage; @@ -33,14 +34,14 @@ class AsyncGetConfigBuilderImpl implements AsyncGetConfigBuilder { private final CuratorFrameworkImpl client; private final UnhandledErrorListener unhandledErrorListener; - private final boolean watched; + private final WatchMode watchMode; private Stat stat = null; - AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched) + AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode) { this.client = client; this.unhandledErrorListener = unhandledErrorListener; - this.watched = watched; + this.watchMode = watchMode; } @Override @@ -53,7 +54,7 @@ class AsyncGetConfigBuilderImpl implements AsyncGetConfigBuilder @Override public AsyncStage<byte[]> forEnsemble() { - BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watched, dataProc); + BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watchMode, dataProc); GetConfigBuilderImpl builder = new GetConfigBuilderImpl(client, common.backgrounding, common.watcher, stat); return safeCall(common.internalCallback, builder::forEnsemble); } http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java index 7214cd8..ac9df4c 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java @@ -21,6 +21,7 @@ package org.apache.curator.x.async.details; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.GetDataBuilderImpl; +import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.api.AsyncGetDataBuilder; import org.apache.curator.x.async.api.AsyncPathable; import org.apache.curator.x.async.AsyncStage; @@ -33,15 +34,15 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder { private final CuratorFrameworkImpl client; private final UnhandledErrorListener unhandledErrorListener; - private final boolean watched; + private final WatchMode watchMode; private boolean decompressed = false; private Stat stat = null; - AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, boolean watched) + AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode) { this.client = client; this.unhandledErrorListener = unhandledErrorListener; - this.watched = watched; + this.watchMode = watchMode; } @Override @@ -69,7 +70,7 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder @Override public AsyncStage<byte[]> forPath(String path) { - BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watched, dataProc); + BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watchMode, dataProc); GetDataBuilderImpl builder = new GetDataBuilderImpl(client, stat, common.watcher, common.backgrounding, decompressed); return safeCall(common.internalCallback, () -> builder.forPath(path)); } http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java index f6a097e..32b9eb5 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java @@ -116,7 +116,7 @@ class AsyncReconfigBuilderImpl implements AsyncReconfigBuilder, AsyncEnsemblable @Override public AsyncStage<Void> forEnsemble() { - BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc); + BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc); ReconfigBuilderImpl builder = new ReconfigBuilderImpl(client, common.backgrounding, stat, fromConfig, newMembers, joining, leaving); return safeCall(common.internalCallback, () -> { builder.forEnsemble(); http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java index 7e9e091..98a8bbb 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java @@ -160,7 +160,7 @@ class AsyncRemoveWatchesBuilderImpl implements AsyncRemoveWatchesBuilder, AsyncP @Override public AsyncStage<Void> forPath(String path) { - BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, false, ignoredProc); + BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc); RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client, watcher, curatorWatcher, http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java index b5f5a06..8908de6 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java @@ -62,7 +62,7 @@ class AsyncSetACLBuilderImpl implements AsyncSetACLBuilder, AsyncPathable<AsyncS @Override public AsyncStage<Stat> forPath(String path) { - BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, false, statProc); + BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, statProc); SetACLBuilderImpl builder = new SetACLBuilderImpl(client, common.backgrounding, aclList, version); return safeCall(common.internalCallback, () -> builder.forPath(path)); } http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java index 3df52b9..cf2a56e 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java @@ -78,7 +78,7 @@ class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder private AsyncStage<Stat> internalForPath(String path, byte[] data, boolean useData) { - BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, false, statProc); + BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, statProc); SetDataBuilderImpl builder = new SetDataBuilderImpl(client, common.backgrounding, version, compressed); return safeCall(common.internalCallback, () -> useData ? builder.forPath(path, data) : builder.forPath(path)); } http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java index 56cd462..043b5b4 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java @@ -20,6 +20,7 @@ package org.apache.curator.x.async.details; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.Backgrounding; +import org.apache.curator.x.async.WatchMode; class BuilderCommon<T> { @@ -27,9 +28,14 @@ class BuilderCommon<T> final Backgrounding backgrounding; final InternalWatcher watcher; - BuilderCommon(UnhandledErrorListener unhandledErrorListener, boolean watched, BackgroundProc<T> proc) + BuilderCommon(UnhandledErrorListener unhandledErrorListener, BackgroundProc<T> proc) { - watcher = watched ? new InternalWatcher() : null; + this(unhandledErrorListener, null, proc); + } + + BuilderCommon(UnhandledErrorListener unhandledErrorListener, WatchMode watchMode, BackgroundProc<T> proc) + { + watcher = (watchMode != null) ? new InternalWatcher(watchMode) : null; internalCallback = new InternalCallback<>(proc, watcher); backgrounding = new Backgrounding(internalCallback, unhandledErrorListener); } http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java index a766380..505226f 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java @@ -40,7 +40,7 @@ class InternalCallback<T> extends CompletableFuture<T> implements BackgroundCall @Override public CompletionStage<WatchedEvent> event() { - return watcher; + return (watcher != null) ? watcher.getFuture() : null; } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java index b631748..2c7de9e 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java @@ -18,41 +18,68 @@ */ package org.apache.curator.x.async.details; -import org.apache.zookeeper.KeeperException; +import org.apache.curator.x.async.AsyncEventException; +import org.apache.curator.x.async.WatchMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; -class InternalWatcher extends CompletableFuture<WatchedEvent> implements Watcher +class InternalWatcher implements Watcher { + private final WatchMode watchMode; + private volatile CompletableFuture<WatchedEvent> future = new CompletableFuture<>(); + + InternalWatcher(WatchMode watchMode) + { + this.watchMode = watchMode; + } + + CompletableFuture<WatchedEvent> getFuture() + { + return future; + } + @Override public void process(WatchedEvent event) { switch ( event.getState() ) { - case ConnectedReadOnly: - case SyncConnected: - case SaslAuthenticated: + default: { - complete(event); + if ( (watchMode != WatchMode.stateChangeOnly) && (event.getType() != Event.EventType.None) ) + { + if ( !future.complete(event) ) + { + future.obtrudeValue(event); + } + } break; } case Disconnected: - { - completeExceptionally(KeeperException.create(KeeperException.Code.CONNECTIONLOSS)); - break; - } - case AuthFailed: - { - completeExceptionally(KeeperException.create(KeeperException.Code.AUTHFAILED)); - break; - } - case Expired: { - completeExceptionally(KeeperException.create(KeeperException.Code.SESSIONEXPIRED)); + if ( watchMode != WatchMode.successOnly ) + { + AsyncEventException exception = new AsyncEventException() + { + @Override + public Event.KeeperState getKeeperState() + { + return event.getState(); + } + + @Override + public CompletionStage<WatchedEvent> reset() + { + future = new CompletableFuture<>(); + return future; + } + }; + future.completeExceptionally(exception); + } break; } } http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java index 2e0fb4d..d66db72 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java @@ -25,13 +25,17 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.io.IOException; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import static java.util.EnumSet.of; @@ -99,6 +103,33 @@ public class TestBasicOperations extends BaseClassForTests Assert.assertTrue(timing.awaitLatch(latch)); } + @Test + public void testWatchingWithServerLoss() throws Exception + { + AsyncStage<Stat> stage = client.watched().checkExists().forPath("/test"); + stage.thenRun(() -> { + try + { + server.stop(); + } + catch ( IOException e ) + { + // ignore + } + }); + + CountDownLatch latch = new CountDownLatch(1); + complete(stage.event(), (v, e) -> { + Assert.assertTrue(e instanceof AsyncEventException); + Assert.assertEquals(((AsyncEventException)e).getKeeperState(), Watcher.Event.KeeperState.Disconnected); + ((AsyncEventException)e).reset().thenRun(latch::countDown); + }); + + server.restart(); + client.create().forPath("/test"); + Assert.assertTrue(timing.awaitLatch(latch)); + } + private <T, U> void complete(CompletionStage<T> stage) { complete(stage, (v, e) -> {}); @@ -111,7 +142,7 @@ public class TestBasicOperations extends BaseClassForTests stage.handle((v, e) -> { handler.accept(v, e); return null; - }).toCompletableFuture().get(); + }).toCompletableFuture().get(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS); } catch ( InterruptedException e ) { @@ -125,5 +156,9 @@ public class TestBasicOperations extends BaseClassForTests } Assert.fail("get() failed", e); } + catch ( TimeoutException e ) + { + Assert.fail("get() timed out"); + } } }