Needed a method to re-stage watchers that triggered only for connection 
problems.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2fa1a69a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2fa1a69a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2fa1a69a

Branch: refs/heads/CURATOR-3.0
Commit: 2fa1a69afd9b1b9d4be6c756b643ca4d4ce1f810
Parents: b028098
Author: randgalt <randg...@apache.org>
Authored: Fri Jan 6 17:22:02 2017 -0500
Committer: randgalt <randg...@apache.org>
Committed: Fri Jan 6 17:22:02 2017 -0500

----------------------------------------------------------------------
 .../curator/x/async/AsyncCuratorFramework.java  | 17 +++++-
 .../curator/x/async/AsyncEventException.java    | 46 +++++++++++++++
 .../org/apache/curator/x/async/WatchMode.java   | 42 ++++++++++++++
 .../x/async/details/AsyncCreateBuilderImpl.java |  2 +-
 .../details/AsyncCuratorFrameworkImpl.java      | 32 +++++-----
 .../x/async/details/AsyncDeleteBuilderImpl.java |  2 +-
 .../x/async/details/AsyncExistsBuilderImpl.java |  9 +--
 .../details/AsyncGetChildrenBuilderImpl.java    |  9 +--
 .../details/AsyncGetConfigBuilderImpl.java      |  9 +--
 .../async/details/AsyncGetDataBuilderImpl.java  |  9 +--
 .../async/details/AsyncReconfigBuilderImpl.java |  2 +-
 .../details/AsyncRemoveWatchesBuilderImpl.java  |  2 +-
 .../x/async/details/AsyncSetACLBuilderImpl.java |  2 +-
 .../async/details/AsyncSetDataBuilderImpl.java  |  2 +-
 .../curator/x/async/details/BuilderCommon.java  | 10 +++-
 .../x/async/details/InternalCallback.java       |  2 +-
 .../x/async/details/InternalWatcher.java        | 61 ++++++++++++++------
 .../curator/x/async/TestBasicOperations.java    | 37 +++++++++++-
 18 files changed, 238 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
index 6dc6f3e..9b29918 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
@@ -53,14 +53,29 @@ public interface AsyncCuratorFramework extends 
AsyncCuratorFrameworkDsl
     CuratorFramework unwrap();
 
     /**
+     * <p>
      * Returns a facade that adds watching to any of the subsequently created 
builders. i.e. all
-     * operations on the WatchedAsyncCuratorFramework facade will have 
watchers set.
+     * operations on the WatchableAsyncCuratorFramework facade will have 
watchers set. Also,
+     * the {@link org.apache.curator.x.async.AsyncStage} returned from these 
builders will
+     * have a loaded staged watcher that is accessed from {@link 
org.apache.curator.x.async.AsyncStage#event()}
+     * </p>
+     *
+     * <p>
+     * {@link WatchMode#stateChangeAndSuccess} is used
+     * </p>
      *
      * @return watcher facade
      */
     WatchableAsyncCuratorFramework watched();
 
     /**
+     * Same as {@link #watched()} but allows specifying the watch mode
+     *
+     * @return watcher facade
+     */
+    WatchableAsyncCuratorFramework watched(WatchMode mode);
+
+    /**
      * Returns a facade that adds the given UnhandledErrorListener to all 
background operations
      *
      * @param listener lister to use

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java
new file mode 100644
index 0000000..f863215
--- /dev/null
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncEventException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * The exception type set for async watchers
+ */
+public abstract class AsyncEventException extends Exception
+{
+    /**
+     * Returns the error condition that temporarily triggered the watcher. 
NOTE: the watcher
+     * will most likely still be set. Use {@link #reset()} to stage on the 
successful trigger
+     *
+     * @return state
+     */
+    public abstract Watcher.Event.KeeperState getKeeperState();
+
+    /**
+     * ZooKeeper temporarily triggers watchers when there is a connection 
event. However, the watcher
+     * stays set for the original operation. Use this method to reset with a 
new completion stage
+     * that will allow waiting for a successful trigger.
+     *
+     * @return new stage
+     */
+    public abstract CompletionStage<WatchedEvent> reset();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java 
b/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java
new file mode 100644
index 0000000..dbce8c1
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/WatchMode.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+public enum WatchMode
+{
+    /**
+     * The {@link 
java.util.concurrent.CompletionStage}&lt;org.apache.zookeeper.WatchedEvent&gt; 
will only
+     * complete on successful trigger. i.e. connection issues are ignored
+     */
+    successOnly,
+
+    /**
+     * The {@link 
java.util.concurrent.CompletionStage}&lt;org.apache.zookeeper.WatchedEvent&gt; 
will only
+     * completeExceptionally. Successful trigger is ignored. Connection 
exceptions are
+     * of type: {@link org.apache.curator.x.async.AsyncEventException}.
+     */
+    stateChangeOnly,
+
+    /**
+     * The {@link 
java.util.concurrent.CompletionStage}&lt;org.apache.zookeeper.WatchedEvent&gt; 
will
+     * complete for both successful trigger and connection exceptions. 
Connection exceptions are
+     * of type: {@link org.apache.curator.x.async.AsyncEventException}.
+     */
+    stateChangeAndSuccess
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
index ce5b31e..7723775 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
@@ -128,7 +128,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
 
     private AsyncStage<String> internalForPath(String path, byte[] data, 
boolean useData)
     {
-        BuilderCommon<String> common = new 
BuilderCommon<>(unhandledErrorListener, false, nameProc);
+        BuilderCommon<String> common = new 
BuilderCommon<>(unhandledErrorListener, nameProc);
         CreateBuilderImpl builder = new CreateBuilderImpl(client,
             createMode,
             common.backgrounding,

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index d502079..a6101f2 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -38,11 +38,11 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
 {
     private final CuratorFrameworkImpl client;
     private final UnhandledErrorListener unhandledErrorListener;
-    private final boolean watched;
+    private final WatchMode watchMode;
 
     public AsyncCuratorFrameworkImpl(CuratorFramework client)
     {
-        this(reveal(client), null, false);
+        this(reveal(client), null, null);
     }
 
     private static CuratorFrameworkImpl reveal(CuratorFramework client)
@@ -57,11 +57,11 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
         }
     }
 
-    public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, boolean watched)
+    public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
     {
         this.client = client;
         this.unhandledErrorListener = unhandledErrorListener;
-        this.watched = watched;
+        this.watchMode = watchMode;
     }
 
     @Override
@@ -99,7 +99,7 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
             @Override
             public AsyncStage<List<ACL>> forPath(String path)
             {
-                BuilderCommon<List<ACL>> common = new 
BuilderCommon<>(unhandledErrorListener, false, aclProc);
+                BuilderCommon<List<ACL>> common = new 
BuilderCommon<>(unhandledErrorListener, aclProc);
                 GetACLBuilderImpl builder = new GetACLBuilderImpl(client, 
common.backgrounding, stat);
                 return safeCall(common.internalCallback, () -> 
builder.forPath(path));
             }
@@ -122,7 +122,7 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
     public AsyncMultiTransaction transaction()
     {
         return operations -> {
-            BuilderCommon<List<CuratorTransactionResult>> common = new 
BuilderCommon<>(unhandledErrorListener, false, opResultsProc);
+            BuilderCommon<List<CuratorTransactionResult>> common = new 
BuilderCommon<>(unhandledErrorListener, opResultsProc);
             CuratorMultiTransactionImpl builder = new 
CuratorMultiTransactionImpl(client, common.backgrounding);
             return safeCall(common.internalCallback, () -> 
builder.forOperations(operations));
         };
@@ -132,7 +132,7 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
     public AsyncSyncBuilder sync()
     {
         return path -> {
-            BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, false, ignoredProc);
+            BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, ignoredProc);
             SyncBuilderImpl builder = new SyncBuilderImpl(client, 
common.backgrounding);
             return safeCall(common.internalCallback, () -> 
builder.forPath(path));
         };
@@ -153,13 +153,19 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
     @Override
     public WatchableAsyncCuratorFramework watched()
     {
-        return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, 
true);
+        return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, 
WatchMode.stateChangeAndSuccess);
+    }
+
+    @Override
+    public WatchableAsyncCuratorFramework watched(WatchMode mode)
+    {
+        return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, 
mode);
     }
 
     @Override
     public AsyncCuratorFrameworkDsl 
withUnhandledErrorListener(UnhandledErrorListener listener)
     {
-        return new AsyncCuratorFrameworkImpl(client, listener, watched);
+        return new AsyncCuratorFrameworkImpl(client, listener, watchMode);
     }
 
     @Override
@@ -171,24 +177,24 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
     @Override
     public AsyncExistsBuilder checkExists()
     {
-        return new AsyncExistsBuilderImpl(client, unhandledErrorListener, 
watched);
+        return new AsyncExistsBuilderImpl(client, unhandledErrorListener, 
watchMode);
     }
 
     @Override
     public AsyncGetDataBuilder getData()
     {
-        return new AsyncGetDataBuilderImpl(client, unhandledErrorListener, 
watched);
+        return new AsyncGetDataBuilderImpl(client, unhandledErrorListener, 
watchMode);
     }
 
     @Override
     public AsyncGetChildrenBuilder getChildren()
     {
-        return new AsyncGetChildrenBuilderImpl(client, unhandledErrorListener, 
watched);
+        return new AsyncGetChildrenBuilderImpl(client, unhandledErrorListener, 
watchMode);
     }
 
     @Override
     public AsyncGetConfigBuilder getConfig()
     {
-        return new AsyncGetConfigBuilderImpl(client, unhandledErrorListener, 
watched);
+        return new AsyncGetConfigBuilderImpl(client, unhandledErrorListener, 
watchMode);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
index 54073b0..243ea44 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
@@ -69,7 +69,7 @@ class AsyncDeleteBuilderImpl implements AsyncDeleteBuilder
     @Override
     public AsyncStage<Void> forPath(String path)
     {
-        BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, false, ignoredProc);
+        BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, ignoredProc);
         DeleteBuilderImpl builder = new DeleteBuilderImpl(client, version, 
common.backgrounding, options.contains(DeleteOption.deletingChildrenIfNeeded), 
options.contains(DeleteOption.guaranteed), 
options.contains(DeleteOption.quietly));
         return safeCall(common.internalCallback, () -> builder.forPath(path));
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
index c77a0aa..d672047 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.details;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.ExistsBuilderImpl;
+import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.api.AsyncExistsBuilder;
 import org.apache.curator.x.async.api.AsyncPathable;
 import org.apache.curator.x.async.AsyncStage;
@@ -37,14 +38,14 @@ class AsyncExistsBuilderImpl implements AsyncExistsBuilder
 {
     private final CuratorFrameworkImpl client;
     private final UnhandledErrorListener unhandledErrorListener;
-    private final boolean watched;
+    private final WatchMode watchMode;
     private Set<ExistsOption> options = Collections.emptySet();
 
-    AsyncExistsBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener 
unhandledErrorListener, boolean watched)
+    AsyncExistsBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener 
unhandledErrorListener, WatchMode watchMode)
     {
         this.client = client;
         this.unhandledErrorListener = unhandledErrorListener;
-        this.watched = watched;
+        this.watchMode = watchMode;
     }
 
     @Override
@@ -57,7 +58,7 @@ class AsyncExistsBuilderImpl implements AsyncExistsBuilder
     @Override
     public AsyncStage<Stat> forPath(String path)
     {
-        BuilderCommon<Stat> common = new 
BuilderCommon<>(unhandledErrorListener, watched, safeStatProc);
+        BuilderCommon<Stat> common = new 
BuilderCommon<>(unhandledErrorListener, watchMode, safeStatProc);
         ExistsBuilderImpl builder = new ExistsBuilderImpl(client, 
common.backgrounding, common.watcher, 
options.contains(ExistsOption.createParentsIfNeeded), 
options.contains(ExistsOption.createParentsAsContainers));
         return safeCall(common.internalCallback, () -> builder.forPath(path));
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
index b429c58..7258c6c 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.details;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.GetChildrenBuilderImpl;
+import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.api.AsyncGetChildrenBuilder;
 import org.apache.curator.x.async.api.AsyncPathable;
 import org.apache.curator.x.async.AsyncStage;
@@ -34,20 +35,20 @@ class AsyncGetChildrenBuilderImpl implements 
AsyncGetChildrenBuilder
 {
     private final CuratorFrameworkImpl client;
     private final UnhandledErrorListener unhandledErrorListener;
-    private final boolean watched;
+    private final WatchMode watchMode;
     private Stat stat = null;
 
-    AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, boolean watched)
+    AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
     {
         this.client = client;
         this.unhandledErrorListener = unhandledErrorListener;
-        this.watched = watched;
+        this.watchMode = watchMode;
     }
 
     @Override
     public AsyncStage<List<String>> forPath(String path)
     {
-        BuilderCommon<List<String>> common = new 
BuilderCommon<>(unhandledErrorListener, watched, childrenProc);
+        BuilderCommon<List<String>> common = new 
BuilderCommon<>(unhandledErrorListener, watchMode, childrenProc);
         GetChildrenBuilderImpl builder = new GetChildrenBuilderImpl(client, 
common.watcher, common.backgrounding, stat);
         return safeCall(common.internalCallback, () -> builder.forPath(path));
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
index 7ecb18a..273fba2 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.details;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.GetConfigBuilderImpl;
+import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.api.AsyncEnsemblable;
 import org.apache.curator.x.async.api.AsyncGetConfigBuilder;
 import org.apache.curator.x.async.AsyncStage;
@@ -33,14 +34,14 @@ class AsyncGetConfigBuilderImpl implements 
AsyncGetConfigBuilder
 {
     private final CuratorFrameworkImpl client;
     private final UnhandledErrorListener unhandledErrorListener;
-    private final boolean watched;
+    private final WatchMode watchMode;
     private Stat stat = null;
 
-    AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, boolean watched)
+    AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
     {
         this.client = client;
         this.unhandledErrorListener = unhandledErrorListener;
-        this.watched = watched;
+        this.watchMode = watchMode;
     }
 
     @Override
@@ -53,7 +54,7 @@ class AsyncGetConfigBuilderImpl implements 
AsyncGetConfigBuilder
     @Override
     public AsyncStage<byte[]> forEnsemble()
     {
-        BuilderCommon<byte[]> common = new 
BuilderCommon<>(unhandledErrorListener, watched, dataProc);
+        BuilderCommon<byte[]> common = new 
BuilderCommon<>(unhandledErrorListener, watchMode, dataProc);
         GetConfigBuilderImpl builder = new GetConfigBuilderImpl(client, 
common.backgrounding, common.watcher, stat);
         return safeCall(common.internalCallback, builder::forEnsemble);
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
index 7214cd8..ac9df4c 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.details;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.GetDataBuilderImpl;
+import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.api.AsyncGetDataBuilder;
 import org.apache.curator.x.async.api.AsyncPathable;
 import org.apache.curator.x.async.AsyncStage;
@@ -33,15 +34,15 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder
 {
     private final CuratorFrameworkImpl client;
     private final UnhandledErrorListener unhandledErrorListener;
-    private final boolean watched;
+    private final WatchMode watchMode;
     private boolean decompressed = false;
     private Stat stat = null;
 
-    AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, boolean watched)
+    AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
     {
         this.client = client;
         this.unhandledErrorListener = unhandledErrorListener;
-        this.watched = watched;
+        this.watchMode = watchMode;
     }
 
     @Override
@@ -69,7 +70,7 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder
     @Override
     public AsyncStage<byte[]> forPath(String path)
     {
-        BuilderCommon<byte[]> common = new 
BuilderCommon<>(unhandledErrorListener, watched, dataProc);
+        BuilderCommon<byte[]> common = new 
BuilderCommon<>(unhandledErrorListener, watchMode, dataProc);
         GetDataBuilderImpl builder = new GetDataBuilderImpl(client, stat, 
common.watcher, common.backgrounding, decompressed);
         return safeCall(common.internalCallback, () -> builder.forPath(path));
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
index f6a097e..32b9eb5 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
@@ -116,7 +116,7 @@ class AsyncReconfigBuilderImpl implements 
AsyncReconfigBuilder, AsyncEnsemblable
     @Override
     public AsyncStage<Void> forEnsemble()
     {
-        BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, false, ignoredProc);
+        BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, ignoredProc);
         ReconfigBuilderImpl builder = new ReconfigBuilderImpl(client, 
common.backgrounding, stat, fromConfig, newMembers, joining, leaving);
         return safeCall(common.internalCallback, () -> {
             builder.forEnsemble();

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
index 7e9e091..98a8bbb 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
@@ -160,7 +160,7 @@ class AsyncRemoveWatchesBuilderImpl implements 
AsyncRemoveWatchesBuilder, AsyncP
     @Override
     public AsyncStage<Void> forPath(String path)
     {
-        BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, false, ignoredProc);
+        BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, ignoredProc);
         RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client,
             watcher,
             curatorWatcher,

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
index b5f5a06..8908de6 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
@@ -62,7 +62,7 @@ class AsyncSetACLBuilderImpl implements AsyncSetACLBuilder, 
AsyncPathable<AsyncS
     @Override
     public AsyncStage<Stat> forPath(String path)
     {
-        BuilderCommon<Stat> common = new 
BuilderCommon<>(unhandledErrorListener, false, statProc);
+        BuilderCommon<Stat> common = new 
BuilderCommon<>(unhandledErrorListener, statProc);
         SetACLBuilderImpl builder = new SetACLBuilderImpl(client, 
common.backgrounding, aclList, version);
         return safeCall(common.internalCallback, () -> builder.forPath(path));
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
index 3df52b9..cf2a56e 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
@@ -78,7 +78,7 @@ class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder
 
     private AsyncStage<Stat> internalForPath(String path, byte[] data, boolean 
useData)
     {
-        BuilderCommon<Stat> common = new 
BuilderCommon<>(unhandledErrorListener, false, statProc);
+        BuilderCommon<Stat> common = new 
BuilderCommon<>(unhandledErrorListener, statProc);
         SetDataBuilderImpl builder = new SetDataBuilderImpl(client, 
common.backgrounding, version, compressed);
         return safeCall(common.internalCallback, () -> useData ? 
builder.forPath(path, data) : builder.forPath(path));
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
index 56cd462..043b5b4 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.async.details;
 
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.Backgrounding;
+import org.apache.curator.x.async.WatchMode;
 
 class BuilderCommon<T>
 {
@@ -27,9 +28,14 @@ class BuilderCommon<T>
     final Backgrounding backgrounding;
     final InternalWatcher watcher;
 
-    BuilderCommon(UnhandledErrorListener unhandledErrorListener, boolean 
watched, BackgroundProc<T> proc)
+    BuilderCommon(UnhandledErrorListener unhandledErrorListener, 
BackgroundProc<T> proc)
     {
-        watcher = watched ? new InternalWatcher() : null;
+        this(unhandledErrorListener, null, proc);
+    }
+
+    BuilderCommon(UnhandledErrorListener unhandledErrorListener, WatchMode 
watchMode, BackgroundProc<T> proc)
+    {
+        watcher = (watchMode != null) ? new InternalWatcher(watchMode) : null;
         internalCallback = new InternalCallback<>(proc, watcher);
         backgrounding = new Backgrounding(internalCallback, 
unhandledErrorListener);
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
index a766380..505226f 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
@@ -40,7 +40,7 @@ class InternalCallback<T> extends CompletableFuture<T> 
implements BackgroundCall
     @Override
     public CompletionStage<WatchedEvent> event()
     {
-        return watcher;
+        return (watcher != null) ? watcher.getFuture() : null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
index b631748..2c7de9e 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
@@ -18,41 +18,68 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.zookeeper.KeeperException;
+import org.apache.curator.x.async.AsyncEventException;
+import org.apache.curator.x.async.WatchMode;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 
-class InternalWatcher extends CompletableFuture<WatchedEvent> implements 
Watcher
+class InternalWatcher implements Watcher
 {
+    private final WatchMode watchMode;
+    private volatile CompletableFuture<WatchedEvent> future = new 
CompletableFuture<>();
+
+    InternalWatcher(WatchMode watchMode)
+    {
+        this.watchMode = watchMode;
+    }
+
+    CompletableFuture<WatchedEvent> getFuture()
+    {
+        return future;
+    }
+
     @Override
     public void process(WatchedEvent event)
     {
         switch ( event.getState() )
         {
-            case ConnectedReadOnly:
-            case SyncConnected:
-            case SaslAuthenticated:
+            default:
             {
-                complete(event);
+                if ( (watchMode != WatchMode.stateChangeOnly) && 
(event.getType() != Event.EventType.None) )
+                {
+                    if ( !future.complete(event) )
+                    {
+                        future.obtrudeValue(event);
+                    }
+                }
                 break;
             }
 
             case Disconnected:
-            {
-                
completeExceptionally(KeeperException.create(KeeperException.Code.CONNECTIONLOSS));
-                break;
-            }
-
             case AuthFailed:
-            {
-                
completeExceptionally(KeeperException.create(KeeperException.Code.AUTHFAILED));
-                break;
-            }
-
             case Expired:
             {
-                
completeExceptionally(KeeperException.create(KeeperException.Code.SESSIONEXPIRED));
+                if ( watchMode != WatchMode.successOnly )
+                {
+                    AsyncEventException exception = new AsyncEventException()
+                    {
+                        @Override
+                        public Event.KeeperState getKeeperState()
+                        {
+                            return event.getState();
+                        }
+
+                        @Override
+                        public CompletionStage<WatchedEvent> reset()
+                        {
+                            future = new CompletableFuture<>();
+                            return future;
+                        }
+                    };
+                    future.completeExceptionally(exception);
+                }
                 break;
             }
         }

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa1a69a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index 2e0fb4d..d66db72 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -25,13 +25,17 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import java.io.IOException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 
 import static java.util.EnumSet.of;
@@ -99,6 +103,33 @@ public class TestBasicOperations extends BaseClassForTests
         Assert.assertTrue(timing.awaitLatch(latch));
     }
 
+    @Test
+    public void testWatchingWithServerLoss() throws Exception
+    {
+        AsyncStage<Stat> stage = 
client.watched().checkExists().forPath("/test");
+        stage.thenRun(() -> {
+            try
+            {
+                server.stop();
+            }
+            catch ( IOException e )
+            {
+                // ignore
+            }
+        });
+
+        CountDownLatch latch = new CountDownLatch(1);
+        complete(stage.event(), (v, e) -> {
+            Assert.assertTrue(e instanceof AsyncEventException);
+            Assert.assertEquals(((AsyncEventException)e).getKeeperState(), 
Watcher.Event.KeeperState.Disconnected);
+            ((AsyncEventException)e).reset().thenRun(latch::countDown);
+        });
+
+        server.restart();
+        client.create().forPath("/test");
+        Assert.assertTrue(timing.awaitLatch(latch));
+    }
+
     private <T, U> void complete(CompletionStage<T> stage)
     {
         complete(stage, (v, e) -> {});
@@ -111,7 +142,7 @@ public class TestBasicOperations extends BaseClassForTests
             stage.handle((v, e) -> {
                 handler.accept(v, e);
                 return null;
-            }).toCompletableFuture().get();
+            }).toCompletableFuture().get(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS);
         }
         catch ( InterruptedException e )
         {
@@ -125,5 +156,9 @@ public class TestBasicOperations extends BaseClassForTests
             }
             Assert.fail("get() failed", e);
         }
+        catch ( TimeoutException e )
+        {
+            Assert.fail("get() timed out");
+        }
     }
 }

Reply via email to