Repository: curator Updated Branches: refs/heads/CURATOR-176 37dc44785 -> 11dad7983
Use NodeCache instead of manually watching. It's safer and more Curator-like Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/11dad798 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/11dad798 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/11dad798 Branch: refs/heads/CURATOR-176 Commit: 11dad79838b4dc00d6a5ebc59c55fe124f55d22c Parents: 37dc447 Author: randgalt <randg...@apache.org> Authored: Mon Jan 12 12:29:44 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Mon Jan 12 12:29:44 2015 -0500 ---------------------------------------------------------------------- .../discovery/details/ServiceDiscoveryImpl.java | 92 ++++++++++---------- 1 file changed, 48 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/11dad798/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index ca8eabe..3a92e7a 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -25,10 +25,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.CloseableUtils; @@ -44,7 +42,6 @@ import org.apache.curator.x.discovery.ServiceType; import org.apache.curator.x.discovery.strategies.RoundRobinStrategy; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,9 +61,9 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> private final String basePath; private final InstanceSerializer<T> serializer; private final Map<String, ServiceInstance<T>> services = Maps.newConcurrentMap(); + private final Map<String, NodeCache> watchedServices; private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap()); private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap()); - private final boolean watchInstances; private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override @@ -96,13 +93,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> */ public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance, boolean watchInstances) { - this.watchInstances = watchInstances; this.client = Preconditions.checkNotNull(client, "client cannot be null"); this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null"); this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null"); + watchedServices = watchInstances ? Maps.<String, NodeCache>newConcurrentMap() : null; if ( thisInstance != null ) { - services.put(thisInstance.getId(), thisInstance); + setService(thisInstance); } } @@ -129,6 +126,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> { CloseableUtils.closeQuietly(provider); } + if ( watchedServices != null ) + { + for ( NodeCache nodeCache : watchedServices.values() ) + { + CloseableUtils.closeQuietly(nodeCache); + } + } Iterator<ServiceInstance<T>> it = services.values().iterator(); while ( it.hasNext() ) @@ -171,7 +175,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> @Override public void registerService(ServiceInstance<T> service) throws Exception { - services.put(service.getId(), service); + setService(service); internalRegisterService(service); } @@ -197,10 +201,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> { CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes); - if ( watchInstances ) - { - resetWatchedInstance(service); - } isDone = true; } catch ( KeeperException.NodeExistsException e ) @@ -374,37 +374,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> return builder.build(); } - private void resetWatchedInstance(final ServiceInstance<T> service) throws Exception - { - CuratorWatcher watcher = new CuratorWatcher() - { - @Override - public void process(WatchedEvent event) throws Exception - { - if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) - { - resetWatchedInstance(service); - } - } - }; - - BackgroundCallback callback = new BackgroundCallback() - { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception - { - if ( event.getType() == CuratorEventType.GET_DATA ) - { - ServiceInstance<T> newInstance = serializer.deserialize(event.getData()); - services.put(newInstance.getId(), newInstance); - } - } - }; - - String path = pathForInstance(service.getName(), service.getId()); - client.getData().usingWatcher(watcher).inBackground(callback).forPath(path); - } - private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception { List<String> instanceIds; @@ -453,4 +422,39 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> internalRegisterService(service); } } + + private void setService(final ServiceInstance<T> instance) + { + services.put(instance.getId(), instance); + if ( watchedServices != null ) + { + final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId())); + try + { + nodeCache.start(true); + } + catch ( Exception e ) + { + log.error("Could not start node cache for: " + instance, e); + } + NodeCacheListener listener = new NodeCacheListener() + { + @Override + public void nodeChanged() throws Exception + { + if ( nodeCache.getCurrentData() != null ) + { + ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData()); + services.put(newInstance.getId(), newInstance); + } + else + { + log.warn("Instance data has been deleted for: " + instance); + } + } + }; + nodeCache.getListenable().addListener(listener); + watchedServices.put(instance.getId(), nodeCache); + } + } }