CURATOR-470

Introduces an alternate cache listener, ServiceCacheEventListener, that gives 
more detail about changes to the cache as opposed to the original version which 
merely denotes a change.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c0c0ecad
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c0c0ecad
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c0c0ecad

Branch: refs/heads/CURATOR-470
Commit: c0c0ecad59af4ca20c277790bb0e877cfb3fb5bd
Parents: 2ebd456
Author: randgalt <randg...@apache.org>
Authored: Sun Jun 24 10:48:36 2018 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sun Jun 24 10:50:31 2018 -0500

----------------------------------------------------------------------
 .../details/ServiceCacheEventListener.java      |   9 +-
 .../x/discovery/details/ServiceCacheImpl.java   | 156 +++++++++----------
 .../details/ServiceCacheListenerWrapper.java    |  53 -------
 .../curator/x/discovery/TestServiceCache.java   |  43 +++--
 4 files changed, 95 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c0c0ecad/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
index e6345c4..7a3b570 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java
@@ -18,30 +18,29 @@
  */
 package org.apache.curator.x.discovery.details;
 
-import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.x.discovery.ServiceInstance;
 
 /**
  * Listener for events (addition/update/deletion) that happen to a service 
cache
  */
-public interface ServiceCacheEventListener<T> extends ConnectionStateListener
+public interface ServiceCacheEventListener<T>
 {
     /**
-     * Called when a new cache is added.
+     * Called when a new instance is added.
      *
      * @param added instance added
      */
     void cacheAdded(ServiceInstance<T> added);
 
     /**
-     * Called when a cache is deleted.
+     * Called when an instance is deleted.
      *
      * @param deleted instance deleted
      */
     void cacheDeleted(ServiceInstance<T> deleted);
 
     /**
-     * Called when a cache is updated.
+     * Called when an instance is updated.
      *
      * @param old old instance
      * @param updated updated instance

http://git-wip-us.apache.org/repos/asf/curator/blob/c0c0ecad/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index 47449e8..715182b 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -37,7 +37,6 @@ import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
 import java.io.IOException;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
@@ -47,7 +46,8 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, 
PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheEventListener<T>>   
listenerContainer = new ListenerContainer<>();
+    private final ListenerContainer<ServiceCacheEventListener<T>>   
eventListenerContainer = new ListenerContainer<>();
+    private final ListenerContainer<ServiceCacheListener>           
listenerContainer = new ListenerContainer<>();
     private final ServiceDiscoveryImpl<T>                           discovery;
     private final AtomicReference<State>                            state = 
new AtomicReference<>(State.LATENT);
     private final PathChildrenCache                                 cache;
@@ -126,18 +126,19 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
         Preconditions.checkState(state.compareAndSet(State.STARTED, 
State.STOPPED), "Already closed or has not been started");
 
         listenerContainer.forEach
-            (
-                new Function<ServiceCacheEventListener<T>, Void>()
+        (
+            new Function<ServiceCacheListener, Void>()
+            {
+                @Override
+                public Void apply(ServiceCacheListener listener)
                 {
-                    @Override
-                    public Void apply(ServiceCacheEventListener<T> listener)
-                    {
-                        
discovery.getClient().getConnectionStateListenable().removeListener(listener);
-                        return null;
-                    }
+                    
discovery.getClient().getConnectionStateListenable().removeListener(listener);
+                    return null;
                 }
-            );
+            }
+        );
         listenerContainer.clear();
+        eventListenerContainer.clear();
 
         CloseableUtils.closeQuietly(cache);
 
@@ -147,115 +148,98 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     @Override
     public Listenable<ServiceCacheEventListener<T>> getCacheEventListenable()
     {
-        return listenerContainer;
+        return eventListenerContainer;
     }
 
     @Override
     public void addListener(ServiceCacheListener listener)
     {
-        ServiceCacheListenerWrapper<T> wrapped = 
ServiceCacheListenerWrapper.<T>wrap(listener);
-        listenerContainer.addListener(wrapped);
-        
discovery.getClient().getConnectionStateListenable().addListener(wrapped);
+        listenerContainer.addListener(listener);
+        
discovery.getClient().getConnectionStateListenable().addListener(listener);
     }
 
     @Override
     public void addListener(ServiceCacheListener listener, Executor executor)
     {
-        ServiceCacheListenerWrapper<T> wrapped = 
ServiceCacheListenerWrapper.<T>wrap(listener);
-        listenerContainer.addListener(wrapped, executor);
-        
discovery.getClient().getConnectionStateListenable().addListener(wrapped, 
executor);
+        listenerContainer.addListener(listener, executor);
+        
discovery.getClient().getConnectionStateListenable().addListener(listener, 
executor);
     }
 
     @Override
-    public void removeListener(final ServiceCacheListener listener)
+    public void removeListener(ServiceCacheListener listener)
     {
-        listenerContainer.forEach
-        (
-            new Function<ServiceCacheEventListener<T>, Void>()
-            {
-                @Override
-                public Void apply(ServiceCacheEventListener<T> eventListener)
-                {
-                    if ( 
Objects.equals(ServiceCacheListenerWrapper.unwrap(eventListener), listener) )
-                    {
-                        listenerContainer.removeListener(eventListener);
-                        
discovery.getClient().getConnectionStateListenable().removeListener(eventListener);
-                    }
-                    return null;
-                }
-            }
-        );
+        listenerContainer.removeListener(listener);
+        
discovery.getClient().getConnectionStateListenable().removeListener(listener);
     }
 
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent 
event) throws Exception
     {
+        boolean notifyListeners = false;
            switch ( event.getType() )
         {
             case CHILD_ADDED:
+            case CHILD_UPDATED:
             {
-                final Tuple<T> tuple = addOrUpdateInstance(event.getData());
-                listenerContainer.forEach
-                (
-                    new Function<ServiceCacheEventListener<T>, Void>()
-                    {
-                        @Override
-                        public Void apply(ServiceCacheEventListener<T> 
listener)
-                        {
-                            listener.cacheAdded(tuple.newInstance);
-                            return null;
-                        }
-                    }
-                );
+                notifyListeners = true;
+                applyTuple(addOrUpdateInstance(event.getData()));
                 break;
             }
 
-        case CHILD_UPDATED:
+            case CHILD_REMOVED:
             {
-                final Tuple<T> tuple = addOrUpdateInstance(event.getData());
-                listenerContainer.forEach
-                (
-                    new Function<ServiceCacheEventListener<T>, Void>()
-                    {
-                        @Override
-                        public Void apply(ServiceCacheEventListener<T> 
listener)
-                        {
-                            if ( tuple.oldInstance != null )
-                            {
-                                listener.cacheUpdated(tuple.oldInstance, 
tuple.newInstance);
-                            }
-                            else
-                            {
-                                listener.cacheAdded(tuple.newInstance);
-                            }
-                            return null;
-                        }
-                    }
-                );
+                notifyListeners = true;
+                final ServiceInstance<T> serviceInstance = 
instances.remove(instanceIdFromData(event.getData()));
+                applyTuple(new Tuple<T>(serviceInstance, null));
                 break;
             }
+        }
 
-            case CHILD_REMOVED:
+        if ( notifyListeners )
+        {
+            listenerContainer.forEach
+            (
+                new Function<ServiceCacheListener, Void>()
+                {
+                    @Override
+                    public Void apply(ServiceCacheListener listener)
+                    {
+                        listener.cacheChanged();
+                        return null;
+                    }
+                }
+            );
+        }
+    }
+
+    private void applyTuple(final Tuple<T> tuple)
+    {
+        eventListenerContainer.forEach
+        (
+            new Function<ServiceCacheEventListener<T>, Void>()
             {
-                final ServiceInstance<T> serviceInstance = 
instances.remove(instanceIdFromData(event.getData()));
-                if ( serviceInstance != null )
+                @Override
+                public Void apply(ServiceCacheEventListener<T> listener)
                 {
-                    listenerContainer.forEach
-                    (
-                        new Function<ServiceCacheEventListener<T>, Void>()
+                    if ( tuple.oldInstance != null )
+                    {
+                        if ( tuple.newInstance != null )
+                        {
+                            listener.cacheUpdated(tuple.oldInstance, 
tuple.newInstance);
+                        }
+                        else
                         {
-                            @Override
-                            public Void apply(ServiceCacheEventListener<T> 
listener)
-                            {
-                                listener.cacheDeleted(serviceInstance);
-                                return null;
-                            }
+                            listener.cacheDeleted(tuple.oldInstance);
                         }
-                    );
+                    }
+                    else if ( tuple.newInstance != null )
+                    {
+                        listener.cacheAdded(tuple.newInstance);
+                    }
+                    return null;
                 }
-                break;
             }
-        }
+        );
     }
 
     private String instanceIdFromData(ChildData childData)
@@ -282,8 +266,8 @@ public class ServiceCacheImpl<T> implements 
ServiceCache<T>, PathChildrenCacheLi
     }
 
     private static class Tuple<T> {
-        public final ServiceInstance<T> oldInstance;
-        public final ServiceInstance<T> newInstance;
+        private final ServiceInstance<T> oldInstance;
+        private final ServiceInstance<T> newInstance;
 
         private Tuple(final ServiceInstance<T> oldInstance, final 
ServiceInstance<T> newInstance) {
             this.oldInstance = oldInstance;

http://git-wip-us.apache.org/repos/asf/curator/blob/c0c0ecad/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java
deleted file mode 100644
index 6f14178..0000000
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.curator.x.discovery.details;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.x.discovery.ServiceInstance;
-
-class ServiceCacheListenerWrapper<T> implements ServiceCacheEventListener<T>
-{
-    private final ServiceCacheListener listener;
-
-    static <T> ServiceCacheListenerWrapper<T> wrap(ServiceCacheListener 
listener)
-    {
-        return new ServiceCacheListenerWrapper<>(listener);
-    }
-
-    static ServiceCacheListener unwrap(ServiceCacheEventListener<?> 
eventListener)
-    {
-        if ( eventListener instanceof ServiceCacheListenerWrapper )
-        {
-            return ((ServiceCacheListenerWrapper)eventListener).listener;
-        }
-        return null;
-    }
-
-    private ServiceCacheListenerWrapper(ServiceCacheListener listener)
-    {
-        this.listener = listener;
-    }
-
-    @Override
-    public void stateChanged(CuratorFramework client, ConnectionState newState)
-    {
-        listener.stateChanged(client, newState);
-    }
-
-    @Override
-    public void cacheAdded(ServiceInstance<T> added)
-    {
-        listener.cacheChanged();
-    }
-
-    @Override
-    public void cacheDeleted(ServiceInstance<T> deleted)
-    {
-        listener.cacheChanged();
-    }
-
-    @Override
-    public void cacheUpdated(ServiceInstance<T> old, ServiceInstance<T> 
updated)
-    {
-        listener.cacheChanged();
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/c0c0ecad/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index 70b41e3..509df7b 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -44,6 +44,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TestServiceCache extends BaseClassForTests
 {
+    private final Timing timing = new Timing();
+
     @Test
     public void testInitialLoad() throws Exception
     {
@@ -106,8 +108,6 @@ public class TestServiceCache extends BaseClassForTests
     @Test
     public void testViaProvider() throws Exception
     {
-        Timing timing = new Timing();
-
         List<Closeable> closeables = Lists.newArrayList();
         try
         {
@@ -330,36 +330,33 @@ public class TestServiceCache extends BaseClassForTests
             ServiceCache<String> cache = 
discovery.serviceCacheBuilder().name("test").build();
             closeables.add(cache);
 
-            final CountDownLatch latch = new CountDownLatch(3);
-
-            final AtomicBoolean notifyError = new AtomicBoolean(false);
+            final Semaphore latch = new Semaphore(0);
+            final AtomicBoolean validation = new AtomicBoolean(true);
             ServiceCacheEventListener<String> listener = new 
ServiceCacheEventListener<String>()
             {
                 @Override
-                public void cacheAdded(final ServiceInstance<String> added) {
-                    latch.countDown();
-
-                    notifyError.compareAndSet(false,added == null);
-                }
-
-                @Override
-                public void cacheDeleted(final ServiceInstance<String> 
deleted) {
-                    latch.countDown();
+                public void cacheAdded(final ServiceInstance<String> added)
+                {
+                    latch.release();
 
-                    notifyError.compareAndSet(false,deleted == null);
+                    validation.compareAndSet(true,added != null);
                 }
 
                 @Override
-                public void cacheUpdated(final ServiceInstance<String> before, 
final ServiceInstance<String> after) {
-                    latch.countDown();
+                public void cacheDeleted(final ServiceInstance<String> deleted)
+                {
+                    latch.release();
 
-                    notifyError.compareAndSet(false, 
!"before".equals(before.getPayload()));
-                    notifyError.compareAndSet(false, 
!"after".equals(after.getPayload()));
+                    validation.compareAndSet(true,deleted != null);
                 }
 
                 @Override
-                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                public void cacheUpdated(final ServiceInstance<String> before, 
final ServiceInstance<String> after)
                 {
+                    latch.release();
+
+                    validation.compareAndSet(true, 
"before".equals(before.getPayload()));
+                    validation.compareAndSet(true, 
"after".equals(after.getPayload()));
                 }
             };
             cache.getCacheEventListenable().addListener(listener);
@@ -367,12 +364,14 @@ public class TestServiceCache extends BaseClassForTests
 
             ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("before").name("test").port(10064).build();
             discovery.registerService(instance);
+            Assert.assertTrue(timing.acquireSemaphore(latch));
             instance = 
ServiceInstance.<String>builder().id(instance.getId()).payload("after").name("test").port(10064).build();
             discovery.updateService(instance);
+            Assert.assertTrue(timing.acquireSemaphore(latch));
             discovery.unregisterService(instance);
-            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+            Assert.assertTrue(timing.acquireSemaphore(latch));
 
-            Assert.assertFalse(notifyError.get());
+            Assert.assertTrue(validation.get());
         }
         finally
         {

Reply via email to