Repository: curator Updated Branches: refs/heads/persistent-watch 32a2fb759 -> 94a0205d4
Finished addPersistentWatcher DSL, re-wrote new version of cache code to handle all cases and deprecated other versions Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/94a0205d Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/94a0205d Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/94a0205d Branch: refs/heads/persistent-watch Commit: 94a0205d4c3d34b1e1384ab5af1b997f74d2a912 Parents: 32a2fb7 Author: randgalt <randg...@apache.org> Authored: Wed Dec 28 23:10:15 2016 -0500 Committer: randgalt <randg...@apache.org> Committed: Wed Dec 28 23:10:15 2016 -0500 ---------------------------------------------------------------------- .../curator/framework/api/CuratorEventType.java | 7 +- .../imps/AddPersistentWatchBuilderImpl.java | 166 ++++++++++ .../framework/imps/CuratorFrameworkImpl.java | 2 +- .../framework/recipes/cache/NodeCache.java | 4 + .../recipes/cache/PathChildrenCache.java | 4 + .../recipes/cache/PersistentWatcherCache.java | 317 ------------------- .../cache/PersistentWatcherCacheFilter.java | 14 - .../cache/PersistentWatcherCacheListener.java | 12 - .../framework/recipes/cache/TreeCache.java | 4 + .../recipes/nodes/PersistentWatcher.java | 104 ------ .../framework/recipes/watch/CacheAction.java | 27 ++ .../framework/recipes/watch/CacheEventType.java | 27 ++ .../framework/recipes/watch/CacheFilter.java | 24 ++ .../framework/recipes/watch/CacheListener.java | 24 ++ .../framework/recipes/watch/CachedNode.java | 93 ++++++ .../framework/recipes/watch/CuratorCache.java | 123 +++++++ .../recipes/watch/CuratorCacheBase.java | 105 ++++++ .../recipes/watch/CuratorCacheBuilder.java | 98 ++++++ .../recipes/watch/InternalCuratorCache.java | 232 ++++++++++++++ .../recipes/watch/InternalNodeCache.java | 302 ++++++++++++++++++ .../recipes/watch/PersistentWatcher.java | 140 ++++++++ .../recipes/watch/SingleLevelCacheFilter.java | 51 +++ .../recipes/watch/StatsOnlyCacheFilter.java | 28 ++ 23 files changed, 1459 insertions(+), 449 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java index 5dea211..4766ca5 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java @@ -96,5 +96,10 @@ public enum CuratorEventType /** * Event sent when client is being closed */ - CLOSING + CLOSING, + + /** + * Corresponds to {@link CuratorFramework#addPersistentWatch()} + */ + ADD_PERSISTENT_WATCH } http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java new file mode 100644 index 0000000..bf4dfb6 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.apache.curator.RetryLoop; +import org.apache.curator.drivers.OperationTrace; +import org.apache.curator.framework.api.AddPersistentWatchBuilder; +import org.apache.curator.framework.api.AddPersistentWatchable; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.Pathable; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.Watcher; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; + +class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String> +{ + private final CuratorFrameworkImpl client; + private Watching watching = null; + private Backgrounding backgrounding = new Backgrounding(); + + AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client) + { + this.client = client; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground() + { + backgrounding = new Backgrounding(); + return this; + } + + @Override + public Pathable<Void> usingWatcher(Watcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public Pathable<Void> usingWatcher(CuratorWatcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground(Object context) + { + backgrounding = new Backgrounding(context); + return this; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback) + { + backgrounding = new Backgrounding(callback); + return this; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context) + { + backgrounding = new Backgrounding(callback, context); + return this; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor) + { + backgrounding = new Backgrounding(callback, executor); + return this; + } + + @Override + public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor) + { + backgrounding = new Backgrounding(client, callback, context, executor); + return this; + } + + @Override + public Void forPath(String path) throws Exception + { + if ( backgrounding.inBackground() ) + { + client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); + } + else + { + pathInForeground(path); + } + return null; + } + + @Override + public void performBackgroundOperation(final OperationAndData<String> data) throws Exception + { + String path = data.getData(); + String fixedPath = client.fixForNamespace(path); + try + { + final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Background"); + client.getZooKeeper().addPersistentWatch + ( + fixedPath, + watching.getWatcher(path), + new AsyncCallback.VoidCallback() + { + @Override + public void processResult(int rc, String path, Object ctx) + { + trace.setReturnCode(rc).setWithWatcher(true).setPath(path).commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_PERSISTENT_WATCH, rc, path, null, ctx, null, null, null, null, null, null); + client.processBackgroundOperation(data, event); + } + }, + backgrounding.getContext() + ); + } + catch ( Throwable e ) + { + backgrounding.checkError(e, watching); + } + } + + private void pathInForeground(final String path) throws Exception + { + final String fixedPath = client.fixForNamespace(path); + OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Foreground"); + RetryLoop.callWithRetry + ( + client.getZookeeperClient(), + new Callable<Void>() + { + @Override + public Void call() throws Exception + { + client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path)); + return null; + } + } + ); + trace.setPath(fixedPath).setWithWatcher(true).commit(); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 04113fd..dd995e5 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -563,7 +563,7 @@ public class CuratorFrameworkImpl implements CuratorFramework @Override public AddPersistentWatchBuilder addPersistentWatch() { - return null; // TODO + return new AddPersistentWatchBuilderImpl(this); } ACLProvider getAclProvider() http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java index 9a6eaa7..b288f2a 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java @@ -28,6 +28,7 @@ import org.apache.curator.framework.WatcherRemoveCuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.recipes.watch.CuratorCache; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.PathUtils; @@ -53,7 +54,10 @@ import java.util.concurrent.atomic.AtomicReference; * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must * be prepared for false-positives and false-negatives. Additionally, always use the version number * when updating data to avoid overwriting another process' change.</p> + * + * @deprecated use {@link CuratorCache} */ +@Deprecated public class NodeCache implements Closeable { private final Logger log = LoggerFactory.getLogger(getClass()); http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java index 91a3a98..c5b9eba 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java @@ -32,6 +32,7 @@ import org.apache.curator.framework.EnsureContainers; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.recipes.watch.CuratorCache; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.CloseableExecutorService; @@ -61,7 +62,10 @@ import java.util.concurrent.atomic.AtomicReference; * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must * be prepared for false-positives and false-negatives. Additionally, always use the version number * when updating data to avoid overwriting another process' change.</p> + * + * @deprecated use {@link CuratorCache} */ +@Deprecated @SuppressWarnings("NullableProblems") public class PathChildrenCache implements Closeable { http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java deleted file mode 100644 index 0e7a448..0000000 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCache.java +++ /dev/null @@ -1,317 +0,0 @@ -package org.apache.curator.framework.recipes.cache; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.framework.listen.ListenerContainer; -import org.apache.curator.framework.recipes.nodes.PersistentWatcher; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import java.io.Closeable; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; - -public class PersistentWatcherCache implements Closeable -{ - private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); - private final PersistentWatcher persistentWatcher; - private final ListenerContainer<PersistentWatcherCacheListener> listeners = new ListenerContainer<>(); - private final ConcurrentMap<String, ChildData> cache = new ConcurrentHashMap<>(); - private final AtomicReference<PersistentWatcherCacheFilter> cacheFilter = new AtomicReference<>(defaultCacheFilter); - - private static final PersistentWatcherCacheFilter defaultCacheFilter = new PersistentWatcherCacheFilter() - { - @Override - public Action actionForPath(String path) - { - return Action.IGNORE; - } - }; - private final CuratorFramework client; - - private enum State - { - LATENT, - STARTED, - CLOSED - } - - private final Watcher watcher = new Watcher() - { - @Override - public void process(final WatchedEvent event) - { - processEvent(event); - } - }; - - public PersistentWatcherCache(CuratorFramework client, String basePath) - { - this.client = Objects.requireNonNull(client, "client cannot be null"); - persistentWatcher = new PersistentWatcher(client, basePath) - { - @Override - protected void reset() - { - super.reset(); - refreshData(); - } - }; - } - - public void start() - { - Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); - persistentWatcher.getListenable().addListener(watcher); - persistentWatcher.start(); - } - - @Override - public void close() - { - if ( state.compareAndSet(State.STARTED, State.CLOSED) ) - { - persistentWatcher.close(); - } - } - - public Listenable<PersistentWatcherCacheListener> getListenable() - { - return listeners; - } - - public void setCacheFilter(PersistentWatcherCacheFilter cacheFilter) - { - this.cacheFilter.set(Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null")); - } - - /** - * Return the current data. There are no guarantees of accuracy. This is - * merely the most recent view of the data. - * - * @return list of children and data - */ - public Map<String, ChildData> getCurrentData() - { - return ImmutableMap.copyOf(cache); - } - - /** - * Return the current data for the given path. There are no guarantees of accuracy. This is - * merely the most recent view of the data. If there is no child with that path, <code>null</code> - * is returned. - * - * @param fullPath full path to the node to check - * @return data or null - */ - public ChildData getCurrentData(String fullPath) - { - return cache.get(fullPath); - } - - /** - * As a memory optimization, you can clear the cached data bytes for a node. Subsequent - * calls to {@link ChildData#getData()} for this node will return <code>null</code>. - * - * @param fullPath the path of the node to clear - */ - public void clearDataBytes(String fullPath) - { - clearDataBytes(fullPath, -1); - } - - /** - * As a memory optimization, you can clear the cached data bytes for a node. Subsequent - * calls to {@link ChildData#getData()} for this node will return <code>null</code>. - * - * @param fullPath the path of the node to clear - * @param ifVersion if non-negative, only clear the data if the data's version matches this version - * @return true if the data was cleared - */ - public boolean clearDataBytes(String fullPath, int ifVersion) - { - ChildData data = cache.get(fullPath); - if ( data != null ) - { - if ( (ifVersion < 0) || ((data.getStat() != null) && (ifVersion == data.getStat().getVersion())) ) - { - if ( data.getData() != null ) - { - cache.replace(fullPath, data, new ChildData(data.getPath(), data.getStat(), null)); - } - return true; - } - } - return false; - } - - /** - * Clears the current data without beginning a new query and without generating any events - * for listeners. - */ - public void clear() - { - cache.clear(); - } - - public void refreshData() - { - for ( String path : cache.keySet() ) - { - PersistentWatcherCacheFilter.Action action = cacheFilter.get().actionForPath(path); - if ( (action == PersistentWatcherCacheFilter.Action.GET_DATA_THEN_SAVE) || (action == PersistentWatcherCacheFilter.Action.SAVE_THEN_GET_DATA) ) - { - getData(path); - } - } - } - - private void processEvent(final WatchedEvent event) - { - switch ( event.getType() ) - { - default: - { - // ignore - break; - } - - case NodeCreated: - case NodeDataChanged: - { - updateNode(event.getPath()); - break; - } - - case NodeDeleted: - { - deleteNode(event.getPath()); - break; - } - } - } - - private void deleteNode(final String path) - { - if ( cache.remove(path) != null ) - { - Function<PersistentWatcherCacheListener, Void> proc = new Function<PersistentWatcherCacheListener, Void>() - { - @Override - public Void apply(PersistentWatcherCacheListener listener) - { - listener.nodeDeleted(path); - return null; - } - }; - listeners.forEach(proc); - } - } - - private void updateNode(final String path) - { - boolean putAndCallListeners; - boolean doGetData; - switch ( cacheFilter.get().actionForPath(path) ) - { - default: - case IGNORE: - { - putAndCallListeners = doGetData = false; - break; - } - - case SAVE_ONLY: - { - putAndCallListeners = true; - doGetData = false; - break; - } - - case SAVE_THEN_GET_DATA: - { - putAndCallListeners = doGetData = true; - break; - } - - case GET_DATA_THEN_SAVE: - { - doGetData = true; - putAndCallListeners = false; - break; - } - } - - if ( putAndCallListeners ) - { - final ChildData oldData = cache.put(path, new ChildData(path, null, null)); - Function<PersistentWatcherCacheListener, Void> proc = new Function<PersistentWatcherCacheListener, Void>() - { - @Override - public Void apply(PersistentWatcherCacheListener listener) - { - if ( oldData == null ) - { - listener.nodeCreated(path); - } - else - { - listener.nodeDataChanged(path); - } - return null; - } - }; - listeners.forEach(proc); - } - - if ( doGetData ) - { - getData(path); - } - } - - private void getData(String path) - { - try - { - BackgroundCallback callback = new BackgroundCallback() - { - @Override - public void processResult(CuratorFramework client, final CuratorEvent event) throws Exception - { - if ( event.getType() == CuratorEventType.GET_DATA ) - { - ChildData newData = new ChildData(event.getPath(), event.getStat(), event.getData()); - ChildData oldData = cache.put(event.getPath(), newData); - if ( !newData.equals(oldData) ) - { - Function<PersistentWatcherCacheListener, Void> proc = new Function<PersistentWatcherCacheListener, Void>() - { - @Override - public Void apply(PersistentWatcherCacheListener listener) - { - listener.nodeDataAvailable(event.getPath()); - return null; - } - }; - listeners.forEach(proc); - } - } - } - }; - client.getData().inBackground().forPath(path); - } - catch ( Exception e ) - { - // TODO - } - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java deleted file mode 100644 index b6dc694..0000000 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheFilter.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.apache.curator.framework.recipes.cache; - -public interface PersistentWatcherCacheFilter -{ - enum Action - { - IGNORE, - SAVE_ONLY, - SAVE_THEN_GET_DATA, - GET_DATA_THEN_SAVE - } - - Action actionForPath(String path); -} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java deleted file mode 100644 index 7a01fd4..0000000 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PersistentWatcherCacheListener.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.apache.curator.framework.recipes.cache; - -public interface PersistentWatcherCacheListener -{ - void nodeCreated(String path); - - void nodeDeleted(String path); - - void nodeDataChanged(String path); - - void nodeDataAvailable(String path); -} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index ed32223..50e3fd6 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -31,6 +31,7 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.recipes.watch.CuratorCache; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.PathUtils; @@ -67,7 +68,10 @@ import static org.apache.curator.utils.PathUtils.validatePath; * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must * be prepared for false-positives and false-negatives. Additionally, always use the version number * when updating data to avoid overwriting another process' change.</p> + * + * @deprecated use {@link CuratorCache} */ +@Deprecated public class TreeCache implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class); http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java deleted file mode 100644 index 9324d55..0000000 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentWatcher.java +++ /dev/null @@ -1,104 +0,0 @@ -package org.apache.curator.framework.recipes.nodes; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.framework.listen.ListenerContainer; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import java.io.Closeable; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; - -public class PersistentWatcher implements Closeable -{ - private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); - private final ListenerContainer<Watcher> listeners = new ListenerContainer<>(); - private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() - { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if ( newState.isConnected() ) - { - reset(); - } - } - }; - private final Watcher watcher = new Watcher() - { - @Override - public void process(final WatchedEvent event) - { - Function<Watcher, Void> function = new Function<Watcher, Void>() - { - @Override - public Void apply(Watcher watcher) - { - watcher.process(event); - return null; - } - }; - listeners.forEach(function); - } - }; - private final CuratorFramework client; - private final String basePath; - - private enum State - { - LATENT, - STARTED, - CLOSED - } - - public PersistentWatcher(CuratorFramework client, String basePath) - { - this.client = Objects.requireNonNull(client, "client cannot be null"); - this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null"); - } - - public void start() - { - Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); - client.getConnectionStateListenable().addListener(connectionStateListener); - reset(); - } - - @Override - public void close() - { - if ( state.compareAndSet(State.STARTED, State.CLOSED) ) - { - client.getConnectionStateListenable().removeListener(connectionStateListener); - try - { - client.watches().remove(watcher).inBackground().forPath(basePath); - } - catch ( Exception e ) - { - // TODO - } - } - } - - public Listenable<Watcher> getListenable() - { - return listeners; - } - - protected void reset() - { - try - { - client.addPersistentWatch().inBackground().usingWatcher(watcher).forPath(basePath); - } - catch ( Exception e ) - { - // TODO - } - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java new file mode 100644 index 0000000..77508c1 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +public enum CacheAction +{ + IGNORE, + DO_NOT_GET_DATA, + GET_DATA, + GET_COMPRESSED +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java new file mode 100644 index 0000000..8094df2 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheEventType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +public enum CacheEventType +{ + NODE_CREATED, + NODE_DELETED, + NODE_CHANGED, + REFRESHED +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java new file mode 100644 index 0000000..9923174 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +public interface CacheFilter +{ + CacheAction actionForPath(String path); +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java new file mode 100644 index 0000000..9f36042 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheListener.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +public interface CacheListener +{ + void process(CacheEventType eventType, String path); +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java new file mode 100644 index 0000000..0f438a2 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import org.apache.zookeeper.data.Stat; +import java.util.Arrays; +import java.util.Objects; + +public class CachedNode +{ + private final Stat stat; + private final byte[] data; + + private static final byte[] defaultData = new byte[0]; + + public CachedNode() + { + this(new Stat(), defaultData); + } + + public CachedNode(Stat stat) + { + this(stat, defaultData); + } + + public CachedNode(Stat stat, byte[] data) + { + this.stat = Objects.requireNonNull(stat, "stat cannot be null"); + this.data = Objects.requireNonNull(data, "data cannot be null"); + } + + public Stat getStat() + { + return stat; + } + + public byte[] getData() + { + return data; + } + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + CachedNode that = (CachedNode)o; + + //noinspection SimplifiableIfStatement + if ( !stat.equals(that.stat) ) + { + return false; + } + return Arrays.equals(data, that.data); + } + + @Override + public int hashCode() + { + int result = stat.hashCode(); + result = 31 * result + Arrays.hashCode(data); + return result; + } + + @Override + public String toString() + { + return "CachedNode{" + "stat=" + stat + ", data=" + Arrays.toString(data) + '}'; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java new file mode 100644 index 0000000..131cc2e --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import org.apache.curator.framework.listen.Listenable; +import java.io.Closeable; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * General interface for client-cached nodes. Create instances + * using {@link CuratorCacheBuilder} + */ +public interface CuratorCache extends Closeable +{ + /** + * Start the cache + */ + void start(); + + @Override + void close(); + + /** + * Get listenable container used to add/remove listeners + * + * @return listener container + */ + Listenable<CacheListener> getListenable(); + + /** + * Refresh all cached nodes and send {@link CacheEventType#REFRESHED} when completed + */ + void refreshAll(); + + /** + * Refresh the given cached node + * + * @param path node full path + */ + void refresh(String path); + + /** + * Remove the given path from the cache. + * + * @param path node full path + * @return true if the node was in the cache + */ + boolean clear(String path); + + /** + * Remove all nodes from the cache + */ + void clearAll(); + + /** + * Return true if there is a cached node at the given path + * + * @param path node full path + * @return true/false + */ + boolean exists(String path); + + /** + * Returns the set of paths in the cache. The returned set behaves in the same manner + * as {@link ConcurrentHashMap#keySet()} + * + * @return set of paths + */ + Set<String> paths(); + + /** + * Returns the collection of node values in the cache. The returned set behaves in the same manner + * as {@link ConcurrentHashMap#values()} + * + * @return node values + */ + Collection<CachedNode> nodes(); + + /** + * Returns the collection of node entries in the cache. The returned set behaves in the same manner + * as {@link ConcurrentHashMap#entrySet()} + * + * @return node entries + */ + Set<Map.Entry<String, CachedNode>> entries(); + + /** + * As a memory optimization, you can clear the cached data bytes for a node. Subsequent + * calls to {@link CachedNode#getData()} for this node will return <code>null</code>. + * + * @param path the path of the node to clear + */ + void clearDataBytes(String path); + + /** + * As a memory optimization, you can clear the cached data bytes for a node. Subsequent + * calls to {@link CachedNode#getData()} for this node will return <code>null</code>. + * + * @param path the path of the node to clear + * @param ifVersion if non-negative, only clear the data if the data's version matches this version + * @return true if the data was cleared + */ + boolean clearDataBytes(String path, int ifVersion); +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java new file mode 100644 index 0000000..0affa18 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import com.google.common.cache.Cache; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +abstract class CuratorCacheBase implements CuratorCache +{ + protected final Cache<String, CachedNode> cache; + + protected CuratorCacheBase(Cache<String, CachedNode> cache) + { + this.cache = Objects.requireNonNull(cache, "cache cannot be null"); + } + + @Override + public final boolean clear(String path) + { + return cache.asMap().remove(path) != null; + } + + @Override + public final void clearAll() + { + cache.invalidateAll(); + } + + @Override + public final boolean exists(String path) + { + return cache.asMap().containsKey(path); + } + + @Override + public final Set<String> paths() + { + return cache.asMap().keySet(); + } + + @Override + public final Collection<CachedNode> nodes() + { + return cache.asMap().values(); + } + + @Override + public final Set<Map.Entry<String, CachedNode>> entries() + { + return cache.asMap().entrySet(); + } + + /** + * As a memory optimization, you can clear the cached data bytes for a node. Subsequent + * calls to {@link CachedNode#getData()} for this node will return <code>null</code>. + * + * @param path the path of the node to clear + */ + @Override + public final void clearDataBytes(String path) + { + clearDataBytes(path, -1); + } + + /** + * As a memory optimization, you can clear the cached data bytes for a node. Subsequent + * calls to {@link CachedNode#getData()} for this node will return <code>null</code>. + * + * @param path the path of the node to clear + * @param ifVersion if non-negative, only clear the data if the data's version matches this version + * @return true if the data was cleared + */ + @Override + public final boolean clearDataBytes(String path, int ifVersion) + { + CachedNode data = cache.asMap().get(path); + if ( data != null ) + { + if ( (ifVersion < 0) || ((data.getStat() != null) && (ifVersion == data.getStat().getVersion())) ) + { + return cache.asMap().replace(path, data, new CachedNode(data.getStat())); + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java new file mode 100644 index 0000000..8ee707b --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import com.google.common.cache.CacheBuilder; +import org.apache.curator.framework.CuratorFramework; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +public class CuratorCacheBuilder +{ + private CacheFilter cacheFilter = new CacheFilter() + { + @Override + public CacheAction actionForPath(String path) + { + return CacheAction.DO_NOT_GET_DATA; + } + }; + private CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder(); + private String path; + private boolean singleNode; + + public static CuratorCacheBuilder forPath(String path) + { + CuratorCacheBuilder builder = new CuratorCacheBuilder(); + builder.path = Objects.requireNonNull(path, "path cannot be null"); + builder.singleNode = false; + return builder; + } + + public static CuratorCacheBuilder forNode(String path) + { + CuratorCacheBuilder builder = new CuratorCacheBuilder(); + builder.path = Objects.requireNonNull(path, "path cannot be null"); + builder.singleNode = true; + return builder; + } + + public CuratorCache build(CuratorFramework client) + { + if ( singleNode ) + { + return new InternalNodeCache(client, path, cacheFilter, cacheBuilder.<String, CachedNode>build()); + } + return new InternalCuratorCache(client, path, cacheFilter, cacheBuilder.<String, CachedNode>build()); + } + + public CuratorCacheBuilder usingWeakValues() + { + cacheBuilder = cacheBuilder.weakValues(); + return this; + } + + public CuratorCacheBuilder usingSoftValues() + { + cacheBuilder = cacheBuilder.softValues(); + return this; + } + + public CuratorCacheBuilder thatExpiresAfterWrite(long duration, TimeUnit unit) + { + cacheBuilder = cacheBuilder.expireAfterWrite(duration, unit); + return this; + } + + public CuratorCacheBuilder thatExpiresAfterAccess(long duration, TimeUnit unit) + { + cacheBuilder = cacheBuilder.expireAfterAccess(duration, unit); + return this; + } + + public CuratorCacheBuilder withCacheFilter(CacheFilter cacheFilter) + { + this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null"); + return this; + } + + private CuratorCacheBuilder() + { + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java new file mode 100644 index 0000000..08006a1 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.utils.ThreadUtils; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +class InternalCuratorCache extends CuratorCacheBase implements Watcher +{ + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final PersistentWatcher watcher; + private final CuratorFramework client; + private final String basePath; + private final CacheFilter cacheFilter; + private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>(); + private static final CachedNode nullNode = new CachedNode(); + + private enum State + { + LATENT, + STARTED, + CLOSED + } + + InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, Cache<String, CachedNode> cache) + { + super(cache); + this.client = Objects.requireNonNull(client, "client cannot be null"); + basePath = path; + this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null"); + watcher = new PersistentWatcher(client, path) + { + @Override + protected void watcherSet() + { + refreshAll(); + } + }; + watcher.getListenable().addListener(this); + } + + @Override + public void start() + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started"); + watcher.start(); + } + + @Override + public void close() + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + watcher.getListenable().removeListener(this); + listeners.clear(); + watcher.close(); + } + } + + @Override + public Listenable<CacheListener> getListenable() + { + return listeners; + } + + @Override + public void process(WatchedEvent event) + { + switch ( event.getType() ) + { + default: + { + // NOP + break; + } + + case NodeDeleted: + { + if ( cache.asMap().remove(event.getPath()) != null ) + { + notifyListeners(CacheEventType.NODE_DELETED, event.getPath()); + } + break; + } + + case NodeCreated: + case NodeDataChanged: + { + refresh(event.getPath()); + break; + } + } + } + + @Override + public void refreshAll() + { + Set<String> keySet = new HashSet<>(cache.asMap().keySet()); + AtomicInteger counter = new AtomicInteger(keySet.size()); + for ( String path : keySet ) + { + internalRefresh(path, counter); + } + } + + @Override + public void refresh(String path) + { + internalRefresh(path, null); + } + + private void internalRefresh(final String path, final AtomicInteger counter) + { + BackgroundCallback callback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + if ( event.getType() == CuratorEventType.GET_DATA ) + { + CachedNode newNode = new CachedNode(event.getStat(), event.getData()); + CachedNode oldNode = cache.asMap().put(path, newNode); + if ( oldNode == null ) + { + notifyListeners(CacheEventType.NODE_CREATED, path); + } + else if ( !newNode.equals(oldNode) ) + { + notifyListeners(CacheEventType.NODE_CHANGED, path); + } + } + + if ( counter.decrementAndGet() <= 0 ) + { + notifyListeners(CacheEventType.REFRESHED, basePath); + } + } + }; + + switch ( cacheFilter.actionForPath(path) ) + { + case IGNORE: + { + // NOP + break; + } + + case DO_NOT_GET_DATA: + { + if ( cache.asMap().put(path, nullNode) == null ) + { + notifyListeners(CacheEventType.NODE_CREATED, path); + } + break; + } + + case GET_DATA: + { + try + { + client.getData().inBackground().forPath(path); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + // TODO + } + break; + } + + case GET_COMPRESSED: + { + try + { + client.getData().decompressed().inBackground().forPath(path); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + // TODO + } + break; + } + } + } + + private void notifyListeners(final CacheEventType eventType, final String path) + { + Function<CacheListener, Void> proc = new Function<CacheListener, Void>() + { + @Override + public Void apply(CacheListener listener) + { + listener.process(eventType, path); + return null; + } + }; + listeners.forEach(proc); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java new file mode 100644 index 0000000..12571de --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.WatcherRemoveCuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.PathUtils; +import org.apache.curator.utils.ThreadUtils; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Objects; +import java.util.concurrent.Exchanger; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +class InternalNodeCache extends CuratorCacheBase +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final WatcherRemoveCuratorFramework client; + private final String path; + private final CacheFilter cacheFilter; + private final AtomicReference<CachedNode> data = new AtomicReference<>(null); + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>(); + private final AtomicBoolean isConnected = new AtomicBoolean(true); + private final AtomicBoolean resetEventNeeded = new AtomicBoolean(true); + private static final CachedNode nullNode = new CachedNode(); + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) ) + { + if ( isConnected.compareAndSet(false, true) ) + { + try + { + reset(); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + log.error("Trying to reset after reconnection", e); + } + } + } + else + { + isConnected.set(false); + } + } + }; + + private Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + try + { + reset(); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + // TODO + } + } + }; + + private enum State + { + LATENT, + STARTED, + CLOSED + } + + private final BackgroundCallback backgroundCallback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + try + { + processBackgroundResult(event); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + // TODO + } + + if ( resetEventNeeded.compareAndSet(true, false) ) + { + notifyListeners(CacheEventType.REFRESHED); + } + } + }; + + InternalNodeCache(CuratorFramework client, String path, CacheFilter cacheFilter, Cache<String, CachedNode> cache) + { + super(cache); + this.client = client.newWatcherRemoveCuratorFramework(); + this.path = PathUtils.validatePath(path); + this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null"); + } + + @Override + public void start() + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started"); + + client.getConnectionStateListenable().addListener(connectionStateListener); + refreshAll(); + } + + @Override + public void close() + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + client.removeWatchers(); + listeners.clear(); + client.getConnectionStateListenable().removeListener(connectionStateListener); + } + } + + @Override + public Listenable<CacheListener> getListenable() + { + return listeners; + } + + @Override + public void refreshAll() + { + try + { + resetEventNeeded.set(true); + reset(); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + // TODO + } + } + + @Override + public void refresh(String path) + { + Preconditions.checkArgument(this.path.equals(path), "Bad path: " + path); + refreshAll(); + } + + private void reset() throws Exception + { + if ( (state.get() == State.STARTED) && isConnected.get() ) + { + client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); + } + } + + private void processBackgroundResult(CuratorEvent event) throws Exception + { + switch ( event.getType() ) + { + case GET_DATA: + { + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + CachedNode cachedNode = new CachedNode(event.getStat(), event.getData()); + setNewData(cachedNode); + } + break; + } + + case EXISTS: + { + if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + setNewData(null); + } + else if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + switch ( cacheFilter.actionForPath(path) ) + { + default: + case IGNORE: + { + throw new UnsupportedOperationException("Single node cache does not support action: IGNORE"); + } + + case DO_NOT_GET_DATA: + { + setNewData(nullNode); + break; + } + + case GET_DATA: + { + client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); + break; + } + + case GET_COMPRESSED: + { + client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); + break; + } + } + } + break; + } + } + } + + @VisibleForTesting + volatile Exchanger<Object> rebuildTestExchanger; + private void setNewData(CachedNode newData) throws InterruptedException + { + CachedNode previousData = data.getAndSet(newData); + if ( newData == null ) + { + notifyListeners(CacheEventType.NODE_DELETED); + } + else if ( previousData == null ) + { + notifyListeners(CacheEventType.NODE_CREATED); + } + else if ( !previousData.equals(newData) ) + { + notifyListeners(CacheEventType.NODE_CHANGED); + } + + if ( rebuildTestExchanger != null ) + { + try + { + rebuildTestExchanger.exchange(new Object()); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + } + } + } + + private void notifyListeners(final CacheEventType event) + { + listeners.forEach + ( + new Function<CacheListener, Void>() + { + @Override + public Void apply(CacheListener listener) + { + try + { + listener.process(event, path); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + log.error("Calling listener", e); + } + return null; + } + } + ); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java new file mode 100644 index 0000000..2f136a6 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import java.io.Closeable; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +public class PersistentWatcher implements Closeable +{ + private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final ListenerContainer<Watcher> listeners = new ListenerContainer<>(); + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( newState.isConnected() ) + { + reset(); + } + } + }; + private final Watcher watcher = new Watcher() + { + @Override + public void process(final WatchedEvent event) + { + Function<Watcher, Void> function = new Function<Watcher, Void>() + { + @Override + public Void apply(Watcher watcher) + { + watcher.process(event); + return null; + } + }; + listeners.forEach(function); + } + }; + private final CuratorFramework client; + private final String basePath; + + private enum State + { + LATENT, + STARTED, + CLOSED + } + + public PersistentWatcher(CuratorFramework client, String basePath) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null"); + } + + public void start() + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); + client.getConnectionStateListenable().addListener(connectionStateListener); + reset(); + } + + @Override + public void close() + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + client.getConnectionStateListenable().removeListener(connectionStateListener); + try + { + client.watches().remove(watcher).inBackground().forPath(basePath); + } + catch ( Exception e ) + { + // TODO + } + } + } + + public Listenable<Watcher> getListenable() + { + return listeners; + } + + private void reset() + { + try + { + BackgroundCallback callback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + if ( event.getResultCode() == 0 ) + { + watcherSet(); + } + } + }; + client.addPersistentWatch().inBackground().usingWatcher(watcher).forPath(basePath); + } + catch ( Exception e ) + { + // TODO + } + } + + protected void watcherSet() + { + // default is NOP + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java new file mode 100644 index 0000000..1fbe255 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +public class SingleLevelCacheFilter implements CacheFilter +{ + private final String levelPath; + private final CacheAction defaultAction; + + public SingleLevelCacheFilter(String levelPath) + { + this(levelPath, CacheAction.GET_DATA); + } + + public SingleLevelCacheFilter(String levelPath, CacheAction defaultAction) + { + this.levelPath = levelPath; + this.defaultAction = defaultAction; + } + + @Override + public CacheAction actionForPath(String path) + { + if ( levelPath.equals(path) ) + { + return actionForMatchedPath(); + } + return CacheAction.IGNORE; + } + + protected CacheAction actionForMatchedPath() + { + return defaultAction; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/94a0205d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java new file mode 100644 index 0000000..d3c7fac --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +public class StatsOnlyCacheFilter implements CacheFilter +{ + @Override + public CacheAction actionForPath(String path) + { + return CacheAction.DO_NOT_GET_DATA; + } +}