Repository: curator
Updated Branches:
  refs/heads/CURATOR-470 [created] c0c0ecad5


Add new service cache listener to let users know what instances have changed, 
while preserving backward compatibility.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/88e8d9a1
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/88e8d9a1
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/88e8d9a1

Branch: refs/heads/CURATOR-470
Commit: 88e8d9a15d4f1690a6df1c1cabb355936f327aaa
Parents: 9383aa3
Author: Qin Meijie <meijie....@eques.com.cn>
Authored: Fri Jun 8 13:06:15 2018 +0800
Committer: Qin Meijie <meijie....@eques.com.cn>
Committed: Fri Jun 8 13:06:15 2018 +0800

----------------------------------------------------------------------
 .../details/ServiceCacheEventListener.java      |  50 +++++++++
 .../x/discovery/details/ServiceCacheImpl.java   | 109 +++++++++++++------
 .../curator/x/discovery/TestServiceCache.java   |  80 ++++++++++++++
 3 files changed, 207 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/88e8d9a1/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
new file mode 100644
index 0000000..1f783f3
--- /dev/null
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.discovery.details;
+
+import org.apache.curator.x.discovery.ServiceInstance;
+
+/**
+ * Listener for events (addition/update/deletion) that happen to a service 
cache
+ */
+public interface ServiceCacheEventListener<T> extends ServiceCacheListener
+{
+
+    /**
+     * Called when a new cache is added.
+     *
+     * @param added instance added
+     */
+    public void cacheAdded(ServiceInstance<T> added);
+
+    /**
+     * Called when a cache is deleted.
+     *
+     * @param deleted instance deleted
+     */
+    public void cacheDeleted(ServiceInstance<T> deleted);
+
+    /**
+     * Called when a cache is updated.
+     *
+     * @param old old instance
+     * @param updated updated instance
+     */
+    public void cacheUpdated(ServiceInstance<T> old, ServiceInstance<T> 
updated);
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/88e8d9a1/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index b8f39d5..df6696a 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -93,7 +93,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, 
PathChildrenCacheLi
         cache.start(true);
         for ( ChildData childData : cache.getCurrentData() )
         {
-            addInstance(childData, true);
+            addInstanceOnlyIfAbsent(childData);
         }
         discovery.cacheOpened(this);
     }
@@ -146,40 +146,72 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent 
event) throws Exception
     {
-        boolean         notifyListeners = false;
-        switch ( event.getType() )
+           final Tuple<T> tuple;
+           switch ( event.getType() )
         {
             case CHILD_ADDED:
+                   tuple = addOrUpdateInstance(event.getData());
+                listenerContainer.forEach(
+                    new Function<ServiceCacheListener, Void>()
+                    {
+                        @Override
+                        public Void apply(ServiceCacheListener listener)
+                        {
+                            listener.cacheChanged();
+
+                            if(listener instanceof  ServiceCacheEventListener) 
{
+                                ((ServiceCacheEventListener) 
listener).cacheAdded(tuple.newInstance);
+                            }
+
+                            return null;
+                        }
+                    }
+                );
+                break;
             case CHILD_UPDATED:
             {
-                addInstance(event.getData(), false);
-                notifyListeners = true;
+                   tuple = addOrUpdateInstance(event.getData());
+                   listenerContainer.forEach(
+                                   new Function<ServiceCacheListener, Void>()
+                                   {
+                                           @Override
+                                           public Void 
apply(ServiceCacheListener listener)
+                                           {
+                                                   listener.cacheChanged();
+
+                                                   if(listener instanceof  
ServiceCacheEventListener) {
+                                                           
((ServiceCacheEventListener) listener).cacheUpdated(tuple.oldInstance, 
tuple.newInstance);
+                                                   }
+
+                                                   return null;
+                                           }
+                                   }
+                   );
                 break;
             }
 
             case CHILD_REMOVED:
             {
-                instances.remove(instanceIdFromData(event.getData()));
-                notifyListeners = true;
+                final ServiceInstance<T> serviceInstance = 
instances.remove(instanceIdFromData(event.getData()));
+                   listenerContainer.forEach(
+                                   new Function<ServiceCacheListener, Void>()
+                                   {
+                                           @Override
+                                           public Void 
apply(ServiceCacheListener listener)
+                                           {
+                                                   listener.cacheChanged();
+
+                                                   if(listener instanceof 
ServiceCacheEventListener) {
+                                                           
((ServiceCacheEventListener) listener).cacheDeleted(serviceInstance);
+                                                   }
+
+                                                   return null;
+                                           }
+                                   }
+                   );
                 break;
             }
         }
-
-        if ( notifyListeners )
-        {
-            listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        listener.cacheChanged();
-                        return null;
-                    }
-                }
-            );
-        }
     }
 
     private String instanceIdFromData(ChildData childData)
@@ -187,18 +219,31 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
         return ZKPaths.getNodeFromPath(childData.getPath());
     }
 
-    private void addInstance(ChildData childData, boolean onlyIfAbsent) throws 
Exception
+    private void addInstanceOnlyIfAbsent(ChildData childData) throws Exception
     {
         String                  instanceId = instanceIdFromData(childData);
         ServiceInstance<T>      serviceInstance = 
discovery.getSerializer().deserialize(childData.getData());
-        if ( onlyIfAbsent )
-        {
-            instances.putIfAbsent(instanceId, serviceInstance);
-        }
-        else
-        {
-            instances.put(instanceId, serviceInstance);
-        }
+
+           instances.putIfAbsent(instanceId, serviceInstance);
+        cache.clearDataBytes(childData.getPath(), 
childData.getStat().getVersion());
+    }
+
+    private Tuple<T> addOrUpdateInstance(ChildData childData) throws Exception
+    {
+        String                  instanceId = instanceIdFromData(childData);
+        ServiceInstance<T>      serviceInstance = 
discovery.getSerializer().deserialize(childData.getData());
+        final Tuple<T>          result = new Tuple<>(instances.put(instanceId, 
serviceInstance), serviceInstance);
         cache.clearDataBytes(childData.getPath(), 
childData.getStat().getVersion());
+        return result;
+    }
+
+    private static class Tuple<T> {
+        public final ServiceInstance<T> oldInstance;
+        public final ServiceInstance<T> newInstance;
+
+        private Tuple(final ServiceInstance<T> oldInstance, final 
ServiceInstance<T> newInstance) {
+            this.oldInstance = oldInstance;
+            this.newInstance = newInstance;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/88e8d9a1/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index fda5c26..65b7cb9 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -28,6 +28,7 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.discovery.details.ServiceCacheEventListener;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -39,6 +40,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TestServiceCache extends BaseClassForTests
 {
@@ -310,4 +312,82 @@ public class TestServiceCache extends BaseClassForTests
             }
         }
     }
+
+    @Test
+    public void testServiceCacheEventListener() throws Exception
+    {
+        List<Closeable> closeables = Lists.newArrayList();
+        try
+        {
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            ServiceDiscovery<String> discovery = 
ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build();
+            closeables.add(discovery);
+            discovery.start();
+
+            ServiceCache<String> cache = 
discovery.serviceCacheBuilder().name("test").build();
+            closeables.add(cache);
+
+            final CountDownLatch latch = new CountDownLatch(6);
+
+            final AtomicBoolean notifyError = new AtomicBoolean(false);
+            ServiceCacheListener listener = new 
ServiceCacheEventListener<String>()
+            {
+                @Override
+                public void cacheAdded(final ServiceInstance<String> added) {
+                    latch.countDown();
+
+                    notifyError.compareAndSet(false,added == null);
+                }
+
+                @Override
+                public void cacheDeleted(final ServiceInstance<String> 
deleted) {
+                    latch.countDown();
+
+                    notifyError.compareAndSet(false,deleted == null);
+                }
+
+                @Override
+                public void cacheUpdated(final ServiceInstance<String> before, 
final ServiceInstance<String> after) {
+                    latch.countDown();
+
+                    notifyError.compareAndSet(false, 
!"before".equals(before.getPayload()));
+                    notifyError.compareAndSet(false, 
!"after".equals(after.getPayload()));
+                }
+
+                @Override
+                public void cacheChanged()
+                {
+                    latch.countDown();
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                {
+
+                }
+            };
+            cache.addListener(listener);
+            cache.start();
+
+            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("before").name("test").port(10064).build();
+            discovery.registerService(instance);
+            instance = 
ServiceInstance.<String>builder().id(instance.getId()).payload("after").name("test").port(10064).build();
+            discovery.updateService(instance);
+            discovery.unregisterService(instance);
+            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+            Assert.assertFalse(notifyError.get());
+        }
+        finally
+        {
+            Collections.reverse(closeables);
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
 }

Reply via email to