Added a filtering feature plus some refactoring

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ee4031de
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ee4031de
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ee4031de

Branch: refs/heads/CURATOR-3.0
Commit: ee4031de78ab06494013e47911896e4e689ba131
Parents: 3876d3e
Author: randgalt <randg...@apache.org>
Authored: Sat Jan 7 15:25:54 2017 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sat Jan 7 15:25:54 2017 -0500

----------------------------------------------------------------------
 .../curator/x/async/AsyncCuratorFramework.java  | 29 +++++++++-
 .../x/async/details/AsyncCreateBuilderImpl.java | 11 ++--
 .../details/AsyncCuratorFrameworkImpl.java      | 59 ++++++++++++--------
 .../x/async/details/AsyncDeleteBuilderImpl.java | 11 ++--
 .../x/async/details/AsyncExistsBuilderImpl.java | 11 ++--
 .../details/AsyncGetChildrenBuilderImpl.java    | 11 ++--
 .../details/AsyncGetConfigBuilderImpl.java      | 11 ++--
 .../async/details/AsyncGetDataBuilderImpl.java  | 11 ++--
 .../async/details/AsyncReconfigBuilderImpl.java | 11 ++--
 .../details/AsyncRemoveWatchesBuilderImpl.java  | 11 ++--
 .../x/async/details/AsyncSetACLBuilderImpl.java | 11 ++--
 .../async/details/AsyncSetDataBuilderImpl.java  | 11 ++--
 .../curator/x/async/details/BuilderCommon.java  | 13 ++---
 .../apache/curator/x/async/details/Filters.java | 53 ++++++++++++++++++
 .../x/async/details/InternalCallback.java       |  6 +-
 .../x/async/details/InternalWatcher.java        | 16 ++++--
 .../framework/imps/TestFrameworkBackground.java |  2 +-
 17 files changed, 189 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
index 91784b0..183a5eb 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java
@@ -19,10 +19,12 @@
 package org.apache.curator.x.async;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.x.async.api.AsyncCuratorFrameworkDsl;
-import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework;
 import org.apache.curator.x.async.details.AsyncCuratorFrameworkImpl;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.function.UnaryOperator;
 
 /**
  * Zookeeper framework-style client that returns composable async operations
@@ -58,5 +60,28 @@ public interface AsyncCuratorFramework extends 
AsyncCuratorFrameworkDsl
      * @param listener lister to use
      * @return facade
      */
-    AsyncCuratorFrameworkDsl withUnhandledErrorListener(UnhandledErrorListener 
listener);
+    AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener);
+
+    /**
+     * Returns a facade that adds the the given filters to all background 
operations and watchers.
+     * <code>resultFilter</code> will get called for every background 
callback. <code>watcherFilter</code>
+     * will get called for every watcher. The filters can return new versions 
or unchanged versions
+     * of the arguments.
+     *
+     * @param resultFilter filter to use or <code>null</code>
+     * @param watcherFilter filter to use or <code>null</code>
+     * @return facade
+     */
+    AsyncCuratorFrameworkDsl with(UnaryOperator<CuratorEvent> resultFilter, 
UnaryOperator<WatchedEvent> watcherFilter);
+
+    /**
+     * Set any combination of listener or filters
+     *
+     * @param resultFilter filter to use or <code>null</code>
+     * @param watcherFilter filter to use or <code>null</code>
+     * @see #with(java.util.function.UnaryOperator, 
java.util.function.UnaryOperator)
+     * @see #with(org.apache.curator.framework.api.UnhandledErrorListener)
+     * @return facade
+     */
+    AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener, 
UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> 
watcherFilter);
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
index b3c91b3..b2b9000 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
@@ -18,12 +18,11 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CreateBuilderImpl;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.AsyncCreateBuilder;
 import org.apache.curator.x.async.api.AsyncPathAndBytesable;
-import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.ACL;
@@ -39,16 +38,16 @@ import static 
org.apache.curator.x.async.details.BackgroundProcs.safeCall;
 class AsyncCreateBuilderImpl implements AsyncCreateBuilder
 {
     private final CuratorFrameworkImpl client;
-    private final UnhandledErrorListener unhandledErrorListener;
+    private final Filters filters;
     private CreateMode createMode = CreateMode.PERSISTENT;
     private List<ACL> aclList = null;
     private Set<CreateOption> options = Collections.emptySet();
     private Stat stat = null;
 
-    AsyncCreateBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener 
unhandledErrorListener)
+    AsyncCreateBuilderImpl(CuratorFrameworkImpl client, Filters filters)
     {
         this.client = client;
-        this.unhandledErrorListener = unhandledErrorListener;
+        this.filters = filters;
     }
 
     @Override
@@ -128,7 +127,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
 
     private AsyncStage<String> internalForPath(String path, byte[] data, 
boolean useData)
     {
-        BuilderCommon<String> common = new 
BuilderCommon<>(unhandledErrorListener, nameProc);
+        BuilderCommon<String> common = new BuilderCommon<>(filters, nameProc);
         CreateBuilderImpl builder = new CreateBuilderImpl(client,
             createMode,
             common.backgrounding,

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index a6101f2..aa82644 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -19,6 +19,7 @@
 package org.apache.curator.x.async.details;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
@@ -27,22 +28,24 @@ import org.apache.curator.framework.imps.GetACLBuilderImpl;
 import org.apache.curator.framework.imps.SyncBuilderImpl;
 import org.apache.curator.x.async.*;
 import org.apache.curator.x.async.api.*;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import java.util.List;
 import java.util.Objects;
+import java.util.function.UnaryOperator;
 
 import static org.apache.curator.x.async.details.BackgroundProcs.*;
 
 public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
 {
     private final CuratorFrameworkImpl client;
-    private final UnhandledErrorListener unhandledErrorListener;
+    private final Filters filters;
     private final WatchMode watchMode;
 
     public AsyncCuratorFrameworkImpl(CuratorFramework client)
     {
-        this(reveal(client), null, null);
+        this(reveal(client), new Filters(null, null, null), null);
     }
 
     private static CuratorFrameworkImpl reveal(CuratorFramework client)
@@ -57,29 +60,29 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
         }
     }
 
-    public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
+    public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, Filters 
filters, WatchMode watchMode)
     {
-        this.client = client;
-        this.unhandledErrorListener = unhandledErrorListener;
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.filters = Objects.requireNonNull(filters, "filters cannot be 
null");
         this.watchMode = watchMode;
     }
 
     @Override
     public AsyncCreateBuilder create()
     {
-        return new AsyncCreateBuilderImpl(client, unhandledErrorListener);
+        return new AsyncCreateBuilderImpl(client, filters);
     }
 
     @Override
     public AsyncDeleteBuilder delete()
     {
-        return new AsyncDeleteBuilderImpl(client, unhandledErrorListener);
+        return new AsyncDeleteBuilderImpl(client, filters);
     }
 
     @Override
     public AsyncSetDataBuilder setData()
     {
-        return new AsyncSetDataBuilderImpl(client, unhandledErrorListener);
+        return new AsyncSetDataBuilderImpl(client, filters);
     }
 
     @Override
@@ -99,7 +102,7 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
             @Override
             public AsyncStage<List<ACL>> forPath(String path)
             {
-                BuilderCommon<List<ACL>> common = new 
BuilderCommon<>(unhandledErrorListener, aclProc);
+                BuilderCommon<List<ACL>> common = new BuilderCommon<>(filters, 
aclProc);
                 GetACLBuilderImpl builder = new GetACLBuilderImpl(client, 
common.backgrounding, stat);
                 return safeCall(common.internalCallback, () -> 
builder.forPath(path));
             }
@@ -109,20 +112,20 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
     @Override
     public AsyncSetACLBuilder setACL()
     {
-        return new AsyncSetACLBuilderImpl(client, unhandledErrorListener);
+        return new AsyncSetACLBuilderImpl(client, filters);
     }
 
     @Override
     public AsyncReconfigBuilder reconfig()
     {
-        return new AsyncReconfigBuilderImpl(client, unhandledErrorListener);
+        return new AsyncReconfigBuilderImpl(client, filters);
     }
 
     @Override
     public AsyncMultiTransaction transaction()
     {
         return operations -> {
-            BuilderCommon<List<CuratorTransactionResult>> common = new 
BuilderCommon<>(unhandledErrorListener, opResultsProc);
+            BuilderCommon<List<CuratorTransactionResult>> common = new 
BuilderCommon<>(filters, opResultsProc);
             CuratorMultiTransactionImpl builder = new 
CuratorMultiTransactionImpl(client, common.backgrounding);
             return safeCall(common.internalCallback, () -> 
builder.forOperations(operations));
         };
@@ -132,7 +135,7 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
     public AsyncSyncBuilder sync()
     {
         return path -> {
-            BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, ignoredProc);
+            BuilderCommon<Void> common = new BuilderCommon<>(filters, 
ignoredProc);
             SyncBuilderImpl builder = new SyncBuilderImpl(client, 
common.backgrounding);
             return safeCall(common.internalCallback, () -> 
builder.forPath(path));
         };
@@ -141,7 +144,7 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
     @Override
     public AsyncRemoveWatchesBuilder removeWatches()
     {
-        return new AsyncRemoveWatchesBuilderImpl(client, 
unhandledErrorListener);
+        return new AsyncRemoveWatchesBuilderImpl(client, filters);
     }
 
     @Override
@@ -153,19 +156,31 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
     @Override
     public WatchableAsyncCuratorFramework watched()
     {
-        return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, 
WatchMode.stateChangeAndSuccess);
+        return new AsyncCuratorFrameworkImpl(client, filters, 
WatchMode.stateChangeAndSuccess);
     }
 
     @Override
     public WatchableAsyncCuratorFramework watched(WatchMode mode)
     {
-        return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, 
mode);
+        return new AsyncCuratorFrameworkImpl(client, filters, mode);
     }
 
     @Override
-    public AsyncCuratorFrameworkDsl 
withUnhandledErrorListener(UnhandledErrorListener listener)
+    public AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener)
     {
-        return new AsyncCuratorFrameworkImpl(client, listener, watchMode);
+        return new AsyncCuratorFrameworkImpl(client, new Filters(listener, 
filters.getResultFilter(), filters.getWatcherFilter()), watchMode);
+    }
+
+    @Override
+    public AsyncCuratorFrameworkDsl with(UnaryOperator<CuratorEvent> 
resultFilter, UnaryOperator<WatchedEvent> watcherFilter)
+    {
+        return new AsyncCuratorFrameworkImpl(client, new 
Filters(filters.getListener(), resultFilter, watcherFilter), watchMode);
+    }
+
+    @Override
+    public AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener, 
UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> 
watcherFilter)
+    {
+        return new AsyncCuratorFrameworkImpl(client, new Filters(listener, 
resultFilter, watcherFilter), watchMode);
     }
 
     @Override
@@ -177,24 +192,24 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
     @Override
     public AsyncExistsBuilder checkExists()
     {
-        return new AsyncExistsBuilderImpl(client, unhandledErrorListener, 
watchMode);
+        return new AsyncExistsBuilderImpl(client, filters, watchMode);
     }
 
     @Override
     public AsyncGetDataBuilder getData()
     {
-        return new AsyncGetDataBuilderImpl(client, unhandledErrorListener, 
watchMode);
+        return new AsyncGetDataBuilderImpl(client, filters, watchMode);
     }
 
     @Override
     public AsyncGetChildrenBuilder getChildren()
     {
-        return new AsyncGetChildrenBuilderImpl(client, unhandledErrorListener, 
watchMode);
+        return new AsyncGetChildrenBuilderImpl(client, filters, watchMode);
     }
 
     @Override
     public AsyncGetConfigBuilder getConfig()
     {
-        return new AsyncGetConfigBuilderImpl(client, unhandledErrorListener, 
watchMode);
+        return new AsyncGetConfigBuilderImpl(client, filters, watchMode);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
index 243ea44..e9efb90 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java
@@ -18,12 +18,11 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.DeleteBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.AsyncDeleteBuilder;
 import org.apache.curator.x.async.api.AsyncPathable;
-import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.DeleteOption;
 import java.util.Collections;
 import java.util.Objects;
@@ -35,14 +34,14 @@ import static 
org.apache.curator.x.async.details.BackgroundProcs.safeCall;
 class AsyncDeleteBuilderImpl implements AsyncDeleteBuilder
 {
     private final CuratorFrameworkImpl client;
-    private final UnhandledErrorListener unhandledErrorListener;
+    private final Filters filters;
     private Set<DeleteOption> options = Collections.emptySet();
     private int version = -1;
 
-    AsyncDeleteBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener 
unhandledErrorListener)
+    AsyncDeleteBuilderImpl(CuratorFrameworkImpl client, Filters filters)
     {
         this.client = client;
-        this.unhandledErrorListener = unhandledErrorListener;
+        this.filters = filters;
     }
 
     @Override
@@ -69,7 +68,7 @@ class AsyncDeleteBuilderImpl implements AsyncDeleteBuilder
     @Override
     public AsyncStage<Void> forPath(String path)
     {
-        BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, ignoredProc);
+        BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
         DeleteBuilderImpl builder = new DeleteBuilderImpl(client, version, 
common.backgrounding, options.contains(DeleteOption.deletingChildrenIfNeeded), 
options.contains(DeleteOption.guaranteed), 
options.contains(DeleteOption.quietly));
         return safeCall(common.internalCallback, () -> builder.forPath(path));
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
index d3bb8ed..7a3385b 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
@@ -18,13 +18,12 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.ExistsBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.api.AsyncExistsBuilder;
 import org.apache.curator.x.async.api.AsyncPathable;
-import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.ExistsOption;
 import org.apache.zookeeper.data.Stat;
 import java.util.Collections;
@@ -37,14 +36,14 @@ import static 
org.apache.curator.x.async.details.BackgroundProcs.safeStatProc;
 class AsyncExistsBuilderImpl implements AsyncExistsBuilder
 {
     private final CuratorFrameworkImpl client;
-    private final UnhandledErrorListener unhandledErrorListener;
+    private final Filters filters;
     private final WatchMode watchMode;
     private Set<ExistsOption> options = Collections.emptySet();
 
-    AsyncExistsBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener 
unhandledErrorListener, WatchMode watchMode)
+    AsyncExistsBuilderImpl(CuratorFrameworkImpl client, Filters filters, 
WatchMode watchMode)
     {
         this.client = client;
-        this.unhandledErrorListener = unhandledErrorListener;
+        this.filters = filters;
         this.watchMode = watchMode;
     }
 
@@ -58,7 +57,7 @@ class AsyncExistsBuilderImpl implements AsyncExistsBuilder
     @Override
     public AsyncStage<Stat> forPath(String path)
     {
-        BuilderCommon<Stat> common = new 
BuilderCommon<>(unhandledErrorListener, watchMode, safeStatProc);
+        BuilderCommon<Stat> common = new BuilderCommon<>(filters, watchMode, 
safeStatProc);
         ExistsBuilderImpl builder = new ExistsBuilderImpl(client,
             common.backgrounding,
             common.watcher,

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
index 7258c6c..b98323f 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java
@@ -18,13 +18,12 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.GetChildrenBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.api.AsyncGetChildrenBuilder;
 import org.apache.curator.x.async.api.AsyncPathable;
-import org.apache.curator.x.async.AsyncStage;
 import org.apache.zookeeper.data.Stat;
 import java.util.List;
 
@@ -34,21 +33,21 @@ import static 
org.apache.curator.x.async.details.BackgroundProcs.safeCall;
 class AsyncGetChildrenBuilderImpl implements AsyncGetChildrenBuilder
 {
     private final CuratorFrameworkImpl client;
-    private final UnhandledErrorListener unhandledErrorListener;
+    private final Filters filters;
     private final WatchMode watchMode;
     private Stat stat = null;
 
-    AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
+    AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, Filters filters, 
WatchMode watchMode)
     {
         this.client = client;
-        this.unhandledErrorListener = unhandledErrorListener;
+        this.filters = filters;
         this.watchMode = watchMode;
     }
 
     @Override
     public AsyncStage<List<String>> forPath(String path)
     {
-        BuilderCommon<List<String>> common = new 
BuilderCommon<>(unhandledErrorListener, watchMode, childrenProc);
+        BuilderCommon<List<String>> common = new BuilderCommon<>(filters, 
watchMode, childrenProc);
         GetChildrenBuilderImpl builder = new GetChildrenBuilderImpl(client, 
common.watcher, common.backgrounding, stat);
         return safeCall(common.internalCallback, () -> builder.forPath(path));
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
index 273fba2..62272a7 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java
@@ -18,13 +18,12 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.GetConfigBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.api.AsyncEnsemblable;
 import org.apache.curator.x.async.api.AsyncGetConfigBuilder;
-import org.apache.curator.x.async.AsyncStage;
 import org.apache.zookeeper.data.Stat;
 
 import static org.apache.curator.x.async.details.BackgroundProcs.dataProc;
@@ -33,14 +32,14 @@ import static 
org.apache.curator.x.async.details.BackgroundProcs.safeCall;
 class AsyncGetConfigBuilderImpl implements AsyncGetConfigBuilder
 {
     private final CuratorFrameworkImpl client;
-    private final UnhandledErrorListener unhandledErrorListener;
+    private final Filters filters;
     private final WatchMode watchMode;
     private Stat stat = null;
 
-    AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
+    AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, Filters filters, 
WatchMode watchMode)
     {
         this.client = client;
-        this.unhandledErrorListener = unhandledErrorListener;
+        this.filters = filters;
         this.watchMode = watchMode;
     }
 
@@ -54,7 +53,7 @@ class AsyncGetConfigBuilderImpl implements 
AsyncGetConfigBuilder
     @Override
     public AsyncStage<byte[]> forEnsemble()
     {
-        BuilderCommon<byte[]> common = new 
BuilderCommon<>(unhandledErrorListener, watchMode, dataProc);
+        BuilderCommon<byte[]> common = new BuilderCommon<>(filters, watchMode, 
dataProc);
         GetConfigBuilderImpl builder = new GetConfigBuilderImpl(client, 
common.backgrounding, common.watcher, stat);
         return safeCall(common.internalCallback, builder::forEnsemble);
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
index ac9df4c..deca49a 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
@@ -18,13 +18,12 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.GetDataBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.api.AsyncGetDataBuilder;
 import org.apache.curator.x.async.api.AsyncPathable;
-import org.apache.curator.x.async.AsyncStage;
 import org.apache.zookeeper.data.Stat;
 
 import static org.apache.curator.x.async.details.BackgroundProcs.dataProc;
@@ -33,15 +32,15 @@ import static 
org.apache.curator.x.async.details.BackgroundProcs.safeCall;
 class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder
 {
     private final CuratorFrameworkImpl client;
-    private final UnhandledErrorListener unhandledErrorListener;
+    private final Filters filters;
     private final WatchMode watchMode;
     private boolean decompressed = false;
     private Stat stat = null;
 
-    AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener, WatchMode watchMode)
+    AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters, 
WatchMode watchMode)
     {
         this.client = client;
-        this.unhandledErrorListener = unhandledErrorListener;
+        this.filters = filters;
         this.watchMode = watchMode;
     }
 
@@ -70,7 +69,7 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder
     @Override
     public AsyncStage<byte[]> forPath(String path)
     {
-        BuilderCommon<byte[]> common = new 
BuilderCommon<>(unhandledErrorListener, watchMode, dataProc);
+        BuilderCommon<byte[]> common = new BuilderCommon<>(filters, watchMode, 
dataProc);
         GetDataBuilderImpl builder = new GetDataBuilderImpl(client, stat, 
common.watcher, common.backgrounding, decompressed);
         return safeCall(common.internalCallback, () -> builder.forPath(path));
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
index 32b9eb5..6114159 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java
@@ -18,12 +18,11 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.ReconfigBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.AsyncEnsemblable;
 import org.apache.curator.x.async.api.AsyncReconfigBuilder;
-import org.apache.curator.x.async.AsyncStage;
 import org.apache.zookeeper.data.Stat;
 import java.util.List;
 
@@ -33,17 +32,17 @@ import static 
org.apache.curator.x.async.details.BackgroundProcs.safeCall;
 class AsyncReconfigBuilderImpl implements AsyncReconfigBuilder, 
AsyncEnsemblable<AsyncStage<Void>>
 {
     private final CuratorFrameworkImpl client;
-    private final UnhandledErrorListener unhandledErrorListener;
+    private final Filters filters;
     private Stat stat = null;
     private long fromConfig = -1;
     private List<String> newMembers = null;
     private List<String> joining = null;
     private List<String> leaving = null;
 
-    AsyncReconfigBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener)
+    AsyncReconfigBuilderImpl(CuratorFrameworkImpl client, Filters filters)
     {
         this.client = client;
-        this.unhandledErrorListener = unhandledErrorListener;
+        this.filters = filters;
     }
 
     @Override
@@ -116,7 +115,7 @@ class AsyncReconfigBuilderImpl implements 
AsyncReconfigBuilder, AsyncEnsemblable
     @Override
     public AsyncStage<Void> forEnsemble()
     {
-        BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, ignoredProc);
+        BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
         ReconfigBuilderImpl builder = new ReconfigBuilderImpl(client, 
common.backgrounding, stat, fromConfig, newMembers, joining, leaving);
         return safeCall(common.internalCallback, () -> {
             builder.forEnsemble();

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
index 98a8bbb..1f3ad79 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java
@@ -19,12 +19,11 @@
 package org.apache.curator.x.async.details;
 
 import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.RemoveWatchesBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.AsyncPathable;
 import org.apache.curator.x.async.api.AsyncRemoveWatchesBuilder;
-import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.RemoveWatcherOption;
 import org.apache.zookeeper.Watcher;
 import java.util.Collections;
@@ -37,16 +36,16 @@ import static 
org.apache.curator.x.async.details.BackgroundProcs.safeCall;
 class AsyncRemoveWatchesBuilderImpl implements AsyncRemoveWatchesBuilder, 
AsyncPathable<AsyncStage<Void>>
 {
     private final CuratorFrameworkImpl client;
-    private final UnhandledErrorListener unhandledErrorListener;
+    private final Filters filters;
     private Watcher.WatcherType watcherType = Watcher.WatcherType.Any;
     private Set<RemoveWatcherOption> options = Collections.emptySet();
     private Watcher watcher = null;
     private CuratorWatcher curatorWatcher = null;
 
-    AsyncRemoveWatchesBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener)
+    AsyncRemoveWatchesBuilderImpl(CuratorFrameworkImpl client, Filters filters)
     {
         this.client = client;
-        this.unhandledErrorListener = unhandledErrorListener;
+        this.filters = filters;
     }
 
     @Override
@@ -160,7 +159,7 @@ class AsyncRemoveWatchesBuilderImpl implements 
AsyncRemoveWatchesBuilder, AsyncP
     @Override
     public AsyncStage<Void> forPath(String path)
     {
-        BuilderCommon<Void> common = new 
BuilderCommon<>(unhandledErrorListener, ignoredProc);
+        BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
         RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client,
             watcher,
             curatorWatcher,

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
index 8908de6..e639b9e 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java
@@ -18,12 +18,11 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.SetACLBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.AsyncPathable;
 import org.apache.curator.x.async.api.AsyncSetACLBuilder;
-import org.apache.curator.x.async.AsyncStage;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import java.util.List;
@@ -34,14 +33,14 @@ import static 
org.apache.curator.x.async.details.BackgroundProcs.statProc;
 class AsyncSetACLBuilderImpl implements AsyncSetACLBuilder, 
AsyncPathable<AsyncStage<Stat>>
 {
     private final CuratorFrameworkImpl client;
-    private final UnhandledErrorListener unhandledErrorListener;
+    private final Filters filters;
     private int version = -1;
     private List<ACL> aclList = null;
 
-    AsyncSetACLBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener 
unhandledErrorListener)
+    AsyncSetACLBuilderImpl(CuratorFrameworkImpl client, Filters filters)
     {
         this.client = client;
-        this.unhandledErrorListener = unhandledErrorListener;
+        this.filters = filters;
     }
 
     @Override
@@ -62,7 +61,7 @@ class AsyncSetACLBuilderImpl implements AsyncSetACLBuilder, 
AsyncPathable<AsyncS
     @Override
     public AsyncStage<Stat> forPath(String path)
     {
-        BuilderCommon<Stat> common = new 
BuilderCommon<>(unhandledErrorListener, statProc);
+        BuilderCommon<Stat> common = new BuilderCommon<>(filters, statProc);
         SetACLBuilderImpl builder = new SetACLBuilderImpl(client, 
common.backgrounding, aclList, version);
         return safeCall(common.internalCallback, () -> builder.forPath(path));
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
index cf2a56e..750fd59 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
@@ -18,12 +18,11 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.SetDataBuilderImpl;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.api.AsyncPathAndBytesable;
 import org.apache.curator.x.async.api.AsyncSetDataBuilder;
-import org.apache.curator.x.async.AsyncStage;
 import org.apache.zookeeper.data.Stat;
 
 import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
@@ -32,14 +31,14 @@ import static 
org.apache.curator.x.async.details.BackgroundProcs.statProc;
 class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder
 {
     private final CuratorFrameworkImpl client;
-    private final UnhandledErrorListener unhandledErrorListener;
+    private final Filters filters;
     private boolean compressed = false;
     private int version = -1;
 
-    AsyncSetDataBuilderImpl(CuratorFrameworkImpl client, 
UnhandledErrorListener unhandledErrorListener)
+    AsyncSetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters)
     {
         this.client = client;
-        this.unhandledErrorListener = unhandledErrorListener;
+        this.filters = filters;
     }
 
     @Override
@@ -78,7 +77,7 @@ class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder
 
     private AsyncStage<Stat> internalForPath(String path, byte[] data, boolean 
useData)
     {
-        BuilderCommon<Stat> common = new 
BuilderCommon<>(unhandledErrorListener, statProc);
+        BuilderCommon<Stat> common = new BuilderCommon<>(filters, statProc);
         SetDataBuilderImpl builder = new SetDataBuilderImpl(client, 
common.backgrounding, version, compressed);
         return safeCall(common.internalCallback, () -> useData ? 
builder.forPath(path, data) : builder.forPath(path));
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
index 043b5b4..82cd244 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java
@@ -18,7 +18,6 @@
  */
 package org.apache.curator.x.async.details;
 
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.Backgrounding;
 import org.apache.curator.x.async.WatchMode;
 
@@ -28,15 +27,15 @@ class BuilderCommon<T>
     final Backgrounding backgrounding;
     final InternalWatcher watcher;
 
-    BuilderCommon(UnhandledErrorListener unhandledErrorListener, 
BackgroundProc<T> proc)
+    BuilderCommon(Filters filters, BackgroundProc<T> proc)
     {
-        this(unhandledErrorListener, null, proc);
+        this(filters,null, proc);
     }
 
-    BuilderCommon(UnhandledErrorListener unhandledErrorListener, WatchMode 
watchMode, BackgroundProc<T> proc)
+    BuilderCommon(Filters filters, WatchMode watchMode, BackgroundProc<T> proc)
     {
-        watcher = (watchMode != null) ? new InternalWatcher(watchMode) : null;
-        internalCallback = new InternalCallback<>(proc, watcher);
-        backgrounding = new Backgrounding(internalCallback, 
unhandledErrorListener);
+        watcher = (watchMode != null) ? new InternalWatcher(watchMode, 
filters.getWatcherFilter()) : null;
+        internalCallback = new InternalCallback<>(proc, watcher, 
filters.getResultFilter());
+        backgrounding = new Backgrounding(internalCallback, 
filters.getListener());
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java
new file mode 100644
index 0000000..ab46590
--- /dev/null
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.function.UnaryOperator;
+
+public class Filters
+{
+    private final UnhandledErrorListener listener;
+    private final UnaryOperator<CuratorEvent> resultFilter;
+    private final UnaryOperator<WatchedEvent> watcherFilter;
+
+    public Filters(UnhandledErrorListener listener, 
UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> 
watcherFilter)
+    {
+        this.listener = listener;
+        this.resultFilter = resultFilter;
+        this.watcherFilter = watcherFilter;
+    }
+
+    public UnhandledErrorListener getListener()
+    {
+        return listener;
+    }
+
+    public UnaryOperator<CuratorEvent> getResultFilter()
+    {
+        return resultFilter;
+    }
+
+    public UnaryOperator<WatchedEvent> getWatcherFilter()
+    {
+        return watcherFilter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
index 505226f..d25c736 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java
@@ -25,16 +25,19 @@ import org.apache.curator.x.async.AsyncStage;
 import org.apache.zookeeper.WatchedEvent;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.function.UnaryOperator;
 
 class InternalCallback<T> extends CompletableFuture<T> implements 
BackgroundCallback, AsyncStage<T>
 {
     private final BackgroundProc<T> resultFunction;
     private final InternalWatcher watcher;
+    private final UnaryOperator<CuratorEvent> resultFilter;
 
-    InternalCallback(BackgroundProc<T> resultFunction, InternalWatcher watcher)
+    InternalCallback(BackgroundProc<T> resultFunction, InternalWatcher 
watcher, UnaryOperator<CuratorEvent> resultFilter)
     {
         this.resultFunction = resultFunction;
         this.watcher = watcher;
+        this.resultFilter = resultFilter;
     }
 
     @Override
@@ -46,6 +49,7 @@ class InternalCallback<T> extends CompletableFuture<T> 
implements BackgroundCall
     @Override
     public void processResult(CuratorFramework client, CuratorEvent event) 
throws Exception
     {
+        event = (resultFilter != null) ? resultFilter.apply(event) : event;
         resultFunction.apply(event, this);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
index 2c7de9e..7578083 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java
@@ -24,15 +24,18 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.function.UnaryOperator;
 
 class InternalWatcher implements Watcher
 {
     private final WatchMode watchMode;
+    private final UnaryOperator<WatchedEvent> watcherFilter;
     private volatile CompletableFuture<WatchedEvent> future = new 
CompletableFuture<>();
 
-    InternalWatcher(WatchMode watchMode)
+    InternalWatcher(WatchMode watchMode, UnaryOperator<WatchedEvent> 
watcherFilter)
     {
         this.watchMode = watchMode;
+        this.watcherFilter = watcherFilter;
     }
 
     CompletableFuture<WatchedEvent> getFuture()
@@ -43,15 +46,16 @@ class InternalWatcher implements Watcher
     @Override
     public void process(WatchedEvent event)
     {
-        switch ( event.getState() )
+        final WatchedEvent localEvent = (watcherFilter != null) ? 
watcherFilter.apply(event) : event;
+        switch ( localEvent.getState() )
         {
             default:
             {
-                if ( (watchMode != WatchMode.stateChangeOnly) && 
(event.getType() != Event.EventType.None) )
+                if ( (watchMode != WatchMode.stateChangeOnly) && 
(localEvent.getType() != Event.EventType.None) )
                 {
-                    if ( !future.complete(event) )
+                    if ( !future.complete(localEvent) )
                     {
-                        future.obtrudeValue(event);
+                        future.obtrudeValue(localEvent);
                     }
                 }
                 break;
@@ -68,7 +72,7 @@ class InternalWatcher implements Watcher
                         @Override
                         public Event.KeeperState getKeeperState()
                         {
-                            return event.getState();
+                            return localEvent.getState();
                         }
 
                         @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
 
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index 52c3faa..c00febd 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -101,7 +101,7 @@ public class TestFrameworkBackground extends 
BaseClassForTests
                     errorLatch.countDown();
                 }
             };
-            
async.withUnhandledErrorListener(listener).create().forPath("/foo");
+            async.with(listener).create().forPath("/foo");
             Assert.assertTrue(new Timing().awaitLatch(errorLatch));
         }
         finally

Reply via email to