Added a filtering feature plus some refactoring
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ee4031de Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ee4031de Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ee4031de Branch: refs/heads/CURATOR-3.0 Commit: ee4031de78ab06494013e47911896e4e689ba131 Parents: 3876d3e Author: randgalt <randg...@apache.org> Authored: Sat Jan 7 15:25:54 2017 -0500 Committer: randgalt <randg...@apache.org> Committed: Sat Jan 7 15:25:54 2017 -0500 ---------------------------------------------------------------------- .../curator/x/async/AsyncCuratorFramework.java | 29 +++++++++- .../x/async/details/AsyncCreateBuilderImpl.java | 11 ++-- .../details/AsyncCuratorFrameworkImpl.java | 59 ++++++++++++-------- .../x/async/details/AsyncDeleteBuilderImpl.java | 11 ++-- .../x/async/details/AsyncExistsBuilderImpl.java | 11 ++-- .../details/AsyncGetChildrenBuilderImpl.java | 11 ++-- .../details/AsyncGetConfigBuilderImpl.java | 11 ++-- .../async/details/AsyncGetDataBuilderImpl.java | 11 ++-- .../async/details/AsyncReconfigBuilderImpl.java | 11 ++-- .../details/AsyncRemoveWatchesBuilderImpl.java | 11 ++-- .../x/async/details/AsyncSetACLBuilderImpl.java | 11 ++-- .../async/details/AsyncSetDataBuilderImpl.java | 11 ++-- .../curator/x/async/details/BuilderCommon.java | 13 ++--- .../apache/curator/x/async/details/Filters.java | 53 ++++++++++++++++++ .../x/async/details/InternalCallback.java | 6 +- .../x/async/details/InternalWatcher.java | 16 ++++-- .../framework/imps/TestFrameworkBackground.java | 2 +- 17 files changed, 189 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java index 91784b0..183a5eb 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncCuratorFramework.java @@ -19,10 +19,12 @@ package org.apache.curator.x.async; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.x.async.api.AsyncCuratorFrameworkDsl; -import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework; import org.apache.curator.x.async.details.AsyncCuratorFrameworkImpl; +import org.apache.zookeeper.WatchedEvent; +import java.util.function.UnaryOperator; /** * Zookeeper framework-style client that returns composable async operations @@ -58,5 +60,28 @@ public interface AsyncCuratorFramework extends AsyncCuratorFrameworkDsl * @param listener lister to use * @return facade */ - AsyncCuratorFrameworkDsl withUnhandledErrorListener(UnhandledErrorListener listener); + AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener); + + /** + * Returns a facade that adds the the given filters to all background operations and watchers. + * <code>resultFilter</code> will get called for every background callback. <code>watcherFilter</code> + * will get called for every watcher. The filters can return new versions or unchanged versions + * of the arguments. + * + * @param resultFilter filter to use or <code>null</code> + * @param watcherFilter filter to use or <code>null</code> + * @return facade + */ + AsyncCuratorFrameworkDsl with(UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter); + + /** + * Set any combination of listener or filters + * + * @param resultFilter filter to use or <code>null</code> + * @param watcherFilter filter to use or <code>null</code> + * @see #with(java.util.function.UnaryOperator, java.util.function.UnaryOperator) + * @see #with(org.apache.curator.framework.api.UnhandledErrorListener) + * @return facade + */ + AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter); } http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java index b3c91b3..b2b9000 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java @@ -18,12 +18,11 @@ */ package org.apache.curator.x.async.details; -import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CreateBuilderImpl; import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.AsyncCreateBuilder; import org.apache.curator.x.async.api.AsyncPathAndBytesable; -import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.CreateOption; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.ACL; @@ -39,16 +38,16 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; class AsyncCreateBuilderImpl implements AsyncCreateBuilder { private final CuratorFrameworkImpl client; - private final UnhandledErrorListener unhandledErrorListener; + private final Filters filters; private CreateMode createMode = CreateMode.PERSISTENT; private List<ACL> aclList = null; private Set<CreateOption> options = Collections.emptySet(); private Stat stat = null; - AsyncCreateBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener) + AsyncCreateBuilderImpl(CuratorFrameworkImpl client, Filters filters) { this.client = client; - this.unhandledErrorListener = unhandledErrorListener; + this.filters = filters; } @Override @@ -128,7 +127,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder private AsyncStage<String> internalForPath(String path, byte[] data, boolean useData) { - BuilderCommon<String> common = new BuilderCommon<>(unhandledErrorListener, nameProc); + BuilderCommon<String> common = new BuilderCommon<>(filters, nameProc); CreateBuilderImpl builder = new CreateBuilderImpl(client, createMode, common.backgrounding, http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java index a6101f2..aa82644 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java @@ -19,6 +19,7 @@ package org.apache.curator.x.async.details; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.framework.imps.CuratorFrameworkImpl; @@ -27,22 +28,24 @@ import org.apache.curator.framework.imps.GetACLBuilderImpl; import org.apache.curator.framework.imps.SyncBuilderImpl; import org.apache.curator.x.async.*; import org.apache.curator.x.async.api.*; +import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import java.util.List; import java.util.Objects; +import java.util.function.UnaryOperator; import static org.apache.curator.x.async.details.BackgroundProcs.*; public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework { private final CuratorFrameworkImpl client; - private final UnhandledErrorListener unhandledErrorListener; + private final Filters filters; private final WatchMode watchMode; public AsyncCuratorFrameworkImpl(CuratorFramework client) { - this(reveal(client), null, null); + this(reveal(client), new Filters(null, null, null), null); } private static CuratorFrameworkImpl reveal(CuratorFramework client) @@ -57,29 +60,29 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework } } - public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode) + public AsyncCuratorFrameworkImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode) { - this.client = client; - this.unhandledErrorListener = unhandledErrorListener; + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.filters = Objects.requireNonNull(filters, "filters cannot be null"); this.watchMode = watchMode; } @Override public AsyncCreateBuilder create() { - return new AsyncCreateBuilderImpl(client, unhandledErrorListener); + return new AsyncCreateBuilderImpl(client, filters); } @Override public AsyncDeleteBuilder delete() { - return new AsyncDeleteBuilderImpl(client, unhandledErrorListener); + return new AsyncDeleteBuilderImpl(client, filters); } @Override public AsyncSetDataBuilder setData() { - return new AsyncSetDataBuilderImpl(client, unhandledErrorListener); + return new AsyncSetDataBuilderImpl(client, filters); } @Override @@ -99,7 +102,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework @Override public AsyncStage<List<ACL>> forPath(String path) { - BuilderCommon<List<ACL>> common = new BuilderCommon<>(unhandledErrorListener, aclProc); + BuilderCommon<List<ACL>> common = new BuilderCommon<>(filters, aclProc); GetACLBuilderImpl builder = new GetACLBuilderImpl(client, common.backgrounding, stat); return safeCall(common.internalCallback, () -> builder.forPath(path)); } @@ -109,20 +112,20 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework @Override public AsyncSetACLBuilder setACL() { - return new AsyncSetACLBuilderImpl(client, unhandledErrorListener); + return new AsyncSetACLBuilderImpl(client, filters); } @Override public AsyncReconfigBuilder reconfig() { - return new AsyncReconfigBuilderImpl(client, unhandledErrorListener); + return new AsyncReconfigBuilderImpl(client, filters); } @Override public AsyncMultiTransaction transaction() { return operations -> { - BuilderCommon<List<CuratorTransactionResult>> common = new BuilderCommon<>(unhandledErrorListener, opResultsProc); + BuilderCommon<List<CuratorTransactionResult>> common = new BuilderCommon<>(filters, opResultsProc); CuratorMultiTransactionImpl builder = new CuratorMultiTransactionImpl(client, common.backgrounding); return safeCall(common.internalCallback, () -> builder.forOperations(operations)); }; @@ -132,7 +135,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework public AsyncSyncBuilder sync() { return path -> { - BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc); + BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc); SyncBuilderImpl builder = new SyncBuilderImpl(client, common.backgrounding); return safeCall(common.internalCallback, () -> builder.forPath(path)); }; @@ -141,7 +144,7 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework @Override public AsyncRemoveWatchesBuilder removeWatches() { - return new AsyncRemoveWatchesBuilderImpl(client, unhandledErrorListener); + return new AsyncRemoveWatchesBuilderImpl(client, filters); } @Override @@ -153,19 +156,31 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework @Override public WatchableAsyncCuratorFramework watched() { - return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, WatchMode.stateChangeAndSuccess); + return new AsyncCuratorFrameworkImpl(client, filters, WatchMode.stateChangeAndSuccess); } @Override public WatchableAsyncCuratorFramework watched(WatchMode mode) { - return new AsyncCuratorFrameworkImpl(client, unhandledErrorListener, mode); + return new AsyncCuratorFrameworkImpl(client, filters, mode); } @Override - public AsyncCuratorFrameworkDsl withUnhandledErrorListener(UnhandledErrorListener listener) + public AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener) { - return new AsyncCuratorFrameworkImpl(client, listener, watchMode); + return new AsyncCuratorFrameworkImpl(client, new Filters(listener, filters.getResultFilter(), filters.getWatcherFilter()), watchMode); + } + + @Override + public AsyncCuratorFrameworkDsl with(UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter) + { + return new AsyncCuratorFrameworkImpl(client, new Filters(filters.getListener(), resultFilter, watcherFilter), watchMode); + } + + @Override + public AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter) + { + return new AsyncCuratorFrameworkImpl(client, new Filters(listener, resultFilter, watcherFilter), watchMode); } @Override @@ -177,24 +192,24 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework @Override public AsyncExistsBuilder checkExists() { - return new AsyncExistsBuilderImpl(client, unhandledErrorListener, watchMode); + return new AsyncExistsBuilderImpl(client, filters, watchMode); } @Override public AsyncGetDataBuilder getData() { - return new AsyncGetDataBuilderImpl(client, unhandledErrorListener, watchMode); + return new AsyncGetDataBuilderImpl(client, filters, watchMode); } @Override public AsyncGetChildrenBuilder getChildren() { - return new AsyncGetChildrenBuilderImpl(client, unhandledErrorListener, watchMode); + return new AsyncGetChildrenBuilderImpl(client, filters, watchMode); } @Override public AsyncGetConfigBuilder getConfig() { - return new AsyncGetConfigBuilderImpl(client, unhandledErrorListener, watchMode); + return new AsyncGetConfigBuilderImpl(client, filters, watchMode); } } http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java index 243ea44..e9efb90 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncDeleteBuilderImpl.java @@ -18,12 +18,11 @@ */ package org.apache.curator.x.async.details; -import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.DeleteBuilderImpl; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.AsyncDeleteBuilder; import org.apache.curator.x.async.api.AsyncPathable; -import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.DeleteOption; import java.util.Collections; import java.util.Objects; @@ -35,14 +34,14 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; class AsyncDeleteBuilderImpl implements AsyncDeleteBuilder { private final CuratorFrameworkImpl client; - private final UnhandledErrorListener unhandledErrorListener; + private final Filters filters; private Set<DeleteOption> options = Collections.emptySet(); private int version = -1; - AsyncDeleteBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener) + AsyncDeleteBuilderImpl(CuratorFrameworkImpl client, Filters filters) { this.client = client; - this.unhandledErrorListener = unhandledErrorListener; + this.filters = filters; } @Override @@ -69,7 +68,7 @@ class AsyncDeleteBuilderImpl implements AsyncDeleteBuilder @Override public AsyncStage<Void> forPath(String path) { - BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc); + BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc); DeleteBuilderImpl builder = new DeleteBuilderImpl(client, version, common.backgrounding, options.contains(DeleteOption.deletingChildrenIfNeeded), options.contains(DeleteOption.guaranteed), options.contains(DeleteOption.quietly)); return safeCall(common.internalCallback, () -> builder.forPath(path)); } http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java index d3bb8ed..7a3385b 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java @@ -18,13 +18,12 @@ */ package org.apache.curator.x.async.details; -import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.ExistsBuilderImpl; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.api.AsyncExistsBuilder; import org.apache.curator.x.async.api.AsyncPathable; -import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.ExistsOption; import org.apache.zookeeper.data.Stat; import java.util.Collections; @@ -37,14 +36,14 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeStatProc; class AsyncExistsBuilderImpl implements AsyncExistsBuilder { private final CuratorFrameworkImpl client; - private final UnhandledErrorListener unhandledErrorListener; + private final Filters filters; private final WatchMode watchMode; private Set<ExistsOption> options = Collections.emptySet(); - AsyncExistsBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode) + AsyncExistsBuilderImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode) { this.client = client; - this.unhandledErrorListener = unhandledErrorListener; + this.filters = filters; this.watchMode = watchMode; } @@ -58,7 +57,7 @@ class AsyncExistsBuilderImpl implements AsyncExistsBuilder @Override public AsyncStage<Stat> forPath(String path) { - BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, watchMode, safeStatProc); + BuilderCommon<Stat> common = new BuilderCommon<>(filters, watchMode, safeStatProc); ExistsBuilderImpl builder = new ExistsBuilderImpl(client, common.backgrounding, common.watcher, http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java index 7258c6c..b98323f 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetChildrenBuilderImpl.java @@ -18,13 +18,12 @@ */ package org.apache.curator.x.async.details; -import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.GetChildrenBuilderImpl; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.api.AsyncGetChildrenBuilder; import org.apache.curator.x.async.api.AsyncPathable; -import org.apache.curator.x.async.AsyncStage; import org.apache.zookeeper.data.Stat; import java.util.List; @@ -34,21 +33,21 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; class AsyncGetChildrenBuilderImpl implements AsyncGetChildrenBuilder { private final CuratorFrameworkImpl client; - private final UnhandledErrorListener unhandledErrorListener; + private final Filters filters; private final WatchMode watchMode; private Stat stat = null; - AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode) + AsyncGetChildrenBuilderImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode) { this.client = client; - this.unhandledErrorListener = unhandledErrorListener; + this.filters = filters; this.watchMode = watchMode; } @Override public AsyncStage<List<String>> forPath(String path) { - BuilderCommon<List<String>> common = new BuilderCommon<>(unhandledErrorListener, watchMode, childrenProc); + BuilderCommon<List<String>> common = new BuilderCommon<>(filters, watchMode, childrenProc); GetChildrenBuilderImpl builder = new GetChildrenBuilderImpl(client, common.watcher, common.backgrounding, stat); return safeCall(common.internalCallback, () -> builder.forPath(path)); } http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java index 273fba2..62272a7 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetConfigBuilderImpl.java @@ -18,13 +18,12 @@ */ package org.apache.curator.x.async.details; -import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.GetConfigBuilderImpl; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.api.AsyncEnsemblable; import org.apache.curator.x.async.api.AsyncGetConfigBuilder; -import org.apache.curator.x.async.AsyncStage; import org.apache.zookeeper.data.Stat; import static org.apache.curator.x.async.details.BackgroundProcs.dataProc; @@ -33,14 +32,14 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; class AsyncGetConfigBuilderImpl implements AsyncGetConfigBuilder { private final CuratorFrameworkImpl client; - private final UnhandledErrorListener unhandledErrorListener; + private final Filters filters; private final WatchMode watchMode; private Stat stat = null; - AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode) + AsyncGetConfigBuilderImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode) { this.client = client; - this.unhandledErrorListener = unhandledErrorListener; + this.filters = filters; this.watchMode = watchMode; } @@ -54,7 +53,7 @@ class AsyncGetConfigBuilderImpl implements AsyncGetConfigBuilder @Override public AsyncStage<byte[]> forEnsemble() { - BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watchMode, dataProc); + BuilderCommon<byte[]> common = new BuilderCommon<>(filters, watchMode, dataProc); GetConfigBuilderImpl builder = new GetConfigBuilderImpl(client, common.backgrounding, common.watcher, stat); return safeCall(common.internalCallback, builder::forEnsemble); } http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java index ac9df4c..deca49a 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java @@ -18,13 +18,12 @@ */ package org.apache.curator.x.async.details; -import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.GetDataBuilderImpl; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.api.AsyncGetDataBuilder; import org.apache.curator.x.async.api.AsyncPathable; -import org.apache.curator.x.async.AsyncStage; import org.apache.zookeeper.data.Stat; import static org.apache.curator.x.async.details.BackgroundProcs.dataProc; @@ -33,15 +32,15 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder { private final CuratorFrameworkImpl client; - private final UnhandledErrorListener unhandledErrorListener; + private final Filters filters; private final WatchMode watchMode; private boolean decompressed = false; private Stat stat = null; - AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener, WatchMode watchMode) + AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode) { this.client = client; - this.unhandledErrorListener = unhandledErrorListener; + this.filters = filters; this.watchMode = watchMode; } @@ -70,7 +69,7 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder @Override public AsyncStage<byte[]> forPath(String path) { - BuilderCommon<byte[]> common = new BuilderCommon<>(unhandledErrorListener, watchMode, dataProc); + BuilderCommon<byte[]> common = new BuilderCommon<>(filters, watchMode, dataProc); GetDataBuilderImpl builder = new GetDataBuilderImpl(client, stat, common.watcher, common.backgrounding, decompressed); return safeCall(common.internalCallback, () -> builder.forPath(path)); } http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java index 32b9eb5..6114159 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncReconfigBuilderImpl.java @@ -18,12 +18,11 @@ */ package org.apache.curator.x.async.details; -import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.ReconfigBuilderImpl; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.AsyncEnsemblable; import org.apache.curator.x.async.api.AsyncReconfigBuilder; -import org.apache.curator.x.async.AsyncStage; import org.apache.zookeeper.data.Stat; import java.util.List; @@ -33,17 +32,17 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; class AsyncReconfigBuilderImpl implements AsyncReconfigBuilder, AsyncEnsemblable<AsyncStage<Void>> { private final CuratorFrameworkImpl client; - private final UnhandledErrorListener unhandledErrorListener; + private final Filters filters; private Stat stat = null; private long fromConfig = -1; private List<String> newMembers = null; private List<String> joining = null; private List<String> leaving = null; - AsyncReconfigBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener) + AsyncReconfigBuilderImpl(CuratorFrameworkImpl client, Filters filters) { this.client = client; - this.unhandledErrorListener = unhandledErrorListener; + this.filters = filters; } @Override @@ -116,7 +115,7 @@ class AsyncReconfigBuilderImpl implements AsyncReconfigBuilder, AsyncEnsemblable @Override public AsyncStage<Void> forEnsemble() { - BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc); + BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc); ReconfigBuilderImpl builder = new ReconfigBuilderImpl(client, common.backgrounding, stat, fromConfig, newMembers, joining, leaving); return safeCall(common.internalCallback, () -> { builder.forEnsemble(); http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java index 98a8bbb..1f3ad79 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncRemoveWatchesBuilderImpl.java @@ -19,12 +19,11 @@ package org.apache.curator.x.async.details; import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.RemoveWatchesBuilderImpl; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.AsyncPathable; import org.apache.curator.x.async.api.AsyncRemoveWatchesBuilder; -import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.RemoveWatcherOption; import org.apache.zookeeper.Watcher; import java.util.Collections; @@ -37,16 +36,16 @@ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; class AsyncRemoveWatchesBuilderImpl implements AsyncRemoveWatchesBuilder, AsyncPathable<AsyncStage<Void>> { private final CuratorFrameworkImpl client; - private final UnhandledErrorListener unhandledErrorListener; + private final Filters filters; private Watcher.WatcherType watcherType = Watcher.WatcherType.Any; private Set<RemoveWatcherOption> options = Collections.emptySet(); private Watcher watcher = null; private CuratorWatcher curatorWatcher = null; - AsyncRemoveWatchesBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener) + AsyncRemoveWatchesBuilderImpl(CuratorFrameworkImpl client, Filters filters) { this.client = client; - this.unhandledErrorListener = unhandledErrorListener; + this.filters = filters; } @Override @@ -160,7 +159,7 @@ class AsyncRemoveWatchesBuilderImpl implements AsyncRemoveWatchesBuilder, AsyncP @Override public AsyncStage<Void> forPath(String path) { - BuilderCommon<Void> common = new BuilderCommon<>(unhandledErrorListener, ignoredProc); + BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc); RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client, watcher, curatorWatcher, http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java index 8908de6..e639b9e 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetACLBuilderImpl.java @@ -18,12 +18,11 @@ */ package org.apache.curator.x.async.details; -import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.SetACLBuilderImpl; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.AsyncPathable; import org.apache.curator.x.async.api.AsyncSetACLBuilder; -import org.apache.curator.x.async.AsyncStage; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import java.util.List; @@ -34,14 +33,14 @@ import static org.apache.curator.x.async.details.BackgroundProcs.statProc; class AsyncSetACLBuilderImpl implements AsyncSetACLBuilder, AsyncPathable<AsyncStage<Stat>> { private final CuratorFrameworkImpl client; - private final UnhandledErrorListener unhandledErrorListener; + private final Filters filters; private int version = -1; private List<ACL> aclList = null; - AsyncSetACLBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener) + AsyncSetACLBuilderImpl(CuratorFrameworkImpl client, Filters filters) { this.client = client; - this.unhandledErrorListener = unhandledErrorListener; + this.filters = filters; } @Override @@ -62,7 +61,7 @@ class AsyncSetACLBuilderImpl implements AsyncSetACLBuilder, AsyncPathable<AsyncS @Override public AsyncStage<Stat> forPath(String path) { - BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, statProc); + BuilderCommon<Stat> common = new BuilderCommon<>(filters, statProc); SetACLBuilderImpl builder = new SetACLBuilderImpl(client, common.backgrounding, aclList, version); return safeCall(common.internalCallback, () -> builder.forPath(path)); } http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java index cf2a56e..750fd59 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java @@ -18,12 +18,11 @@ */ package org.apache.curator.x.async.details; -import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.SetDataBuilderImpl; +import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.AsyncPathAndBytesable; import org.apache.curator.x.async.api.AsyncSetDataBuilder; -import org.apache.curator.x.async.AsyncStage; import org.apache.zookeeper.data.Stat; import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; @@ -32,14 +31,14 @@ import static org.apache.curator.x.async.details.BackgroundProcs.statProc; class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder { private final CuratorFrameworkImpl client; - private final UnhandledErrorListener unhandledErrorListener; + private final Filters filters; private boolean compressed = false; private int version = -1; - AsyncSetDataBuilderImpl(CuratorFrameworkImpl client, UnhandledErrorListener unhandledErrorListener) + AsyncSetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters) { this.client = client; - this.unhandledErrorListener = unhandledErrorListener; + this.filters = filters; } @Override @@ -78,7 +77,7 @@ class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder private AsyncStage<Stat> internalForPath(String path, byte[] data, boolean useData) { - BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, statProc); + BuilderCommon<Stat> common = new BuilderCommon<>(filters, statProc); SetDataBuilderImpl builder = new SetDataBuilderImpl(client, common.backgrounding, version, compressed); return safeCall(common.internalCallback, () -> useData ? builder.forPath(path, data) : builder.forPath(path)); } http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java index 043b5b4..82cd244 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/BuilderCommon.java @@ -18,7 +18,6 @@ */ package org.apache.curator.x.async.details; -import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.Backgrounding; import org.apache.curator.x.async.WatchMode; @@ -28,15 +27,15 @@ class BuilderCommon<T> final Backgrounding backgrounding; final InternalWatcher watcher; - BuilderCommon(UnhandledErrorListener unhandledErrorListener, BackgroundProc<T> proc) + BuilderCommon(Filters filters, BackgroundProc<T> proc) { - this(unhandledErrorListener, null, proc); + this(filters,null, proc); } - BuilderCommon(UnhandledErrorListener unhandledErrorListener, WatchMode watchMode, BackgroundProc<T> proc) + BuilderCommon(Filters filters, WatchMode watchMode, BackgroundProc<T> proc) { - watcher = (watchMode != null) ? new InternalWatcher(watchMode) : null; - internalCallback = new InternalCallback<>(proc, watcher); - backgrounding = new Backgrounding(internalCallback, unhandledErrorListener); + watcher = (watchMode != null) ? new InternalWatcher(watchMode, filters.getWatcherFilter()) : null; + internalCallback = new InternalCallback<>(proc, watcher, filters.getResultFilter()); + backgrounding = new Backgrounding(internalCallback, filters.getListener()); } } http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java new file mode 100644 index 0000000..ab46590 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/Filters.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.details; + +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.zookeeper.WatchedEvent; +import java.util.function.UnaryOperator; + +public class Filters +{ + private final UnhandledErrorListener listener; + private final UnaryOperator<CuratorEvent> resultFilter; + private final UnaryOperator<WatchedEvent> watcherFilter; + + public Filters(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter) + { + this.listener = listener; + this.resultFilter = resultFilter; + this.watcherFilter = watcherFilter; + } + + public UnhandledErrorListener getListener() + { + return listener; + } + + public UnaryOperator<CuratorEvent> getResultFilter() + { + return resultFilter; + } + + public UnaryOperator<WatchedEvent> getWatcherFilter() + { + return watcherFilter; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java index 505226f..d25c736 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalCallback.java @@ -25,16 +25,19 @@ import org.apache.curator.x.async.AsyncStage; import org.apache.zookeeper.WatchedEvent; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.UnaryOperator; class InternalCallback<T> extends CompletableFuture<T> implements BackgroundCallback, AsyncStage<T> { private final BackgroundProc<T> resultFunction; private final InternalWatcher watcher; + private final UnaryOperator<CuratorEvent> resultFilter; - InternalCallback(BackgroundProc<T> resultFunction, InternalWatcher watcher) + InternalCallback(BackgroundProc<T> resultFunction, InternalWatcher watcher, UnaryOperator<CuratorEvent> resultFilter) { this.resultFunction = resultFunction; this.watcher = watcher; + this.resultFilter = resultFilter; } @Override @@ -46,6 +49,7 @@ class InternalCallback<T> extends CompletableFuture<T> implements BackgroundCall @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + event = (resultFilter != null) ? resultFilter.apply(event) : event; resultFunction.apply(event, this); } } http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java index 2c7de9e..7578083 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/InternalWatcher.java @@ -24,15 +24,18 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.UnaryOperator; class InternalWatcher implements Watcher { private final WatchMode watchMode; + private final UnaryOperator<WatchedEvent> watcherFilter; private volatile CompletableFuture<WatchedEvent> future = new CompletableFuture<>(); - InternalWatcher(WatchMode watchMode) + InternalWatcher(WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter) { this.watchMode = watchMode; + this.watcherFilter = watcherFilter; } CompletableFuture<WatchedEvent> getFuture() @@ -43,15 +46,16 @@ class InternalWatcher implements Watcher @Override public void process(WatchedEvent event) { - switch ( event.getState() ) + final WatchedEvent localEvent = (watcherFilter != null) ? watcherFilter.apply(event) : event; + switch ( localEvent.getState() ) { default: { - if ( (watchMode != WatchMode.stateChangeOnly) && (event.getType() != Event.EventType.None) ) + if ( (watchMode != WatchMode.stateChangeOnly) && (localEvent.getType() != Event.EventType.None) ) { - if ( !future.complete(event) ) + if ( !future.complete(localEvent) ) { - future.obtrudeValue(event); + future.obtrudeValue(localEvent); } } break; @@ -68,7 +72,7 @@ class InternalWatcher implements Watcher @Override public Event.KeeperState getKeeperState() { - return event.getState(); + return localEvent.getState(); } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/ee4031de/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java index 52c3faa..c00febd 100644 --- a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java +++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java @@ -101,7 +101,7 @@ public class TestFrameworkBackground extends BaseClassForTests errorLatch.countDown(); } }; - async.withUnhandledErrorListener(listener).create().forPath("/foo"); + async.with(listener).create().forPath("/foo"); Assert.assertTrue(new Timing().awaitLatch(errorLatch)); } finally