Repository: curator
Updated Branches:
  refs/heads/CURATOR-176 37dc44785 -> 11dad7983


Use NodeCache instead of manually watching. It's safer and more Curator-like


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/11dad798
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/11dad798
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/11dad798

Branch: refs/heads/CURATOR-176
Commit: 11dad79838b4dc00d6a5ebc59c55fe124f55d22c
Parents: 37dc447
Author: randgalt <randg...@apache.org>
Authored: Mon Jan 12 12:29:44 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Mon Jan 12 12:29:44 2015 -0500

----------------------------------------------------------------------
 .../discovery/details/ServiceDiscoveryImpl.java | 92 ++++++++++----------
 1 file changed, 48 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/11dad798/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index ca8eabe..3a92e7a 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -25,10 +25,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableUtils;
@@ -44,7 +42,6 @@ import org.apache.curator.x.discovery.ServiceType;
 import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,9 +61,9 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     private final String basePath;
     private final InstanceSerializer<T> serializer;
     private final Map<String, ServiceInstance<T>> services = 
Maps.newConcurrentMap();
+    private final Map<String, NodeCache> watchedServices;
     private final Collection<ServiceCache<T>> caches = 
Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
     private final Collection<ServiceProvider<T>> providers = 
Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
-    private final boolean watchInstances;
     private final ConnectionStateListener connectionStateListener = new 
ConnectionStateListener()
     {
         @Override
@@ -96,13 +93,13 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
      */
     public ServiceDiscoveryImpl(CuratorFramework client, String basePath, 
InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance, boolean 
watchInstances)
     {
-        this.watchInstances = watchInstances;
         this.client = Preconditions.checkNotNull(client, "client cannot be 
null");
         this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot 
be null");
         this.serializer = Preconditions.checkNotNull(serializer, "serializer 
cannot be null");
+        watchedServices = watchInstances ? Maps.<String, 
NodeCache>newConcurrentMap() : null;
         if ( thisInstance != null )
         {
-            services.put(thisInstance.getId(), thisInstance);
+            setService(thisInstance);
         }
     }
 
@@ -129,6 +126,13 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
         {
             CloseableUtils.closeQuietly(provider);
         }
+        if ( watchedServices != null )
+        {
+            for ( NodeCache nodeCache : watchedServices.values() )
+            {
+                CloseableUtils.closeQuietly(nodeCache);
+            }
+        }
 
         Iterator<ServiceInstance<T>> it = services.values().iterator();
         while ( it.hasNext() )
@@ -171,7 +175,7 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public void registerService(ServiceInstance<T> service) throws Exception
     {
-        services.put(service.getId(), service);
+        setService(service);
         internalRegisterService(service);
     }
 
@@ -197,10 +201,6 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
             {
                 CreateMode      mode = (service.getServiceType() == 
ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
                 
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes);
-                if ( watchInstances )
-                {
-                    resetWatchedInstance(service);
-                }
                 isDone = true;
             }
             catch ( KeeperException.NodeExistsException e )
@@ -374,37 +374,6 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
         return builder.build();
     }
 
-    private void resetWatchedInstance(final ServiceInstance<T> service) throws 
Exception
-    {
-        CuratorWatcher watcher = new CuratorWatcher()
-        {
-            @Override
-            public void process(WatchedEvent event) throws Exception
-            {
-                if ( event.getType() == 
Watcher.Event.EventType.NodeDataChanged )
-                {
-                    resetWatchedInstance(service);
-                }
-            }
-        };
-
-        BackgroundCallback callback = new BackgroundCallback()
-        {
-            @Override
-            public void processResult(CuratorFramework client, CuratorEvent 
event) throws Exception
-            {
-                if ( event.getType() == CuratorEventType.GET_DATA )
-                {
-                    ServiceInstance<T> newInstance = 
serializer.deserialize(event.getData());
-                    services.put(newInstance.getId(), newInstance);
-                }
-            }
-        };
-
-        String path = pathForInstance(service.getName(), service.getId());
-        
client.getData().usingWatcher(watcher).inBackground(callback).forPath(path);
-    }
-
     private List<String> getChildrenWatched(String path, Watcher watcher, 
boolean recurse) throws Exception
     {
         List<String>    instanceIds;
@@ -453,4 +422,39 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
             internalRegisterService(service);
         }
     }
+
+    private void setService(final ServiceInstance<T> instance)
+    {
+        services.put(instance.getId(), instance);
+        if ( watchedServices != null )
+        {
+            final NodeCache nodeCache = new NodeCache(client, 
pathForInstance(instance.getName(), instance.getId()));
+            try
+            {
+                nodeCache.start(true);
+            }
+            catch ( Exception e )
+            {
+                log.error("Could not start node cache for: " + instance, e);
+            }
+            NodeCacheListener listener = new NodeCacheListener()
+            {
+                @Override
+                public void nodeChanged() throws Exception
+                {
+                    if ( nodeCache.getCurrentData() != null )
+                    {
+                        ServiceInstance<T> newInstance = 
serializer.deserialize(nodeCache.getCurrentData().getData());
+                        services.put(newInstance.getId(), newInstance);
+                    }
+                    else
+                    {
+                        log.warn("Instance data has been deleted for: " + 
instance);
+                    }
+                }
+            };
+            nodeCache.getListenable().addListener(listener);
+            watchedServices.put(instance.getId(), nodeCache);
+        }
+    }
 }

Reply via email to