CURATOR-470 Introduces an alternate cache listener, ServiceCacheEventListener, that gives more detail about changes to the cache as opposed to the original version which merely denotes a change.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c0c0ecad Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c0c0ecad Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c0c0ecad Branch: refs/heads/CURATOR-470 Commit: c0c0ecad59af4ca20c277790bb0e877cfb3fb5bd Parents: 2ebd456 Author: randgalt <randg...@apache.org> Authored: Sun Jun 24 10:48:36 2018 -0500 Committer: randgalt <randg...@apache.org> Committed: Sun Jun 24 10:50:31 2018 -0500 ---------------------------------------------------------------------- .../details/ServiceCacheEventListener.java | 9 +- .../x/discovery/details/ServiceCacheImpl.java | 156 +++++++++---------- .../details/ServiceCacheListenerWrapper.java | 53 ------- .../curator/x/discovery/TestServiceCache.java | 43 +++-- 4 files changed, 95 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/c0c0ecad/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java index e6345c4..7a3b570 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java @@ -18,30 +18,29 @@ */ package org.apache.curator.x.discovery.details; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.x.discovery.ServiceInstance; /** * Listener for events (addition/update/deletion) that happen to a service cache */ -public interface ServiceCacheEventListener<T> extends ConnectionStateListener +public interface ServiceCacheEventListener<T> { /** - * Called when a new cache is added. + * Called when a new instance is added. * * @param added instance added */ void cacheAdded(ServiceInstance<T> added); /** - * Called when a cache is deleted. + * Called when an instance is deleted. * * @param deleted instance deleted */ void cacheDeleted(ServiceInstance<T> deleted); /** - * Called when a cache is updated. + * Called when an instance is updated. * * @param old old instance * @param updated updated instance http://git-wip-us.apache.org/repos/asf/curator/blob/c0c0ecad/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index 47449e8..715182b 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -37,7 +37,6 @@ import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceInstance; import java.io.IOException; import java.util.List; -import java.util.Objects; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -47,7 +46,8 @@ import java.util.concurrent.atomic.AtomicReference; public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener { - private final ListenerContainer<ServiceCacheEventListener<T>> listenerContainer = new ListenerContainer<>(); + private final ListenerContainer<ServiceCacheEventListener<T>> eventListenerContainer = new ListenerContainer<>(); + private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<>(); private final ServiceDiscoveryImpl<T> discovery; private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); private final PathChildrenCache cache; @@ -126,18 +126,19 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started"); listenerContainer.forEach - ( - new Function<ServiceCacheEventListener<T>, Void>() + ( + new Function<ServiceCacheListener, Void>() + { + @Override + public Void apply(ServiceCacheListener listener) { - @Override - public Void apply(ServiceCacheEventListener<T> listener) - { - discovery.getClient().getConnectionStateListenable().removeListener(listener); - return null; - } + discovery.getClient().getConnectionStateListenable().removeListener(listener); + return null; } - ); + } + ); listenerContainer.clear(); + eventListenerContainer.clear(); CloseableUtils.closeQuietly(cache); @@ -147,115 +148,98 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi @Override public Listenable<ServiceCacheEventListener<T>> getCacheEventListenable() { - return listenerContainer; + return eventListenerContainer; } @Override public void addListener(ServiceCacheListener listener) { - ServiceCacheListenerWrapper<T> wrapped = ServiceCacheListenerWrapper.<T>wrap(listener); - listenerContainer.addListener(wrapped); - discovery.getClient().getConnectionStateListenable().addListener(wrapped); + listenerContainer.addListener(listener); + discovery.getClient().getConnectionStateListenable().addListener(listener); } @Override public void addListener(ServiceCacheListener listener, Executor executor) { - ServiceCacheListenerWrapper<T> wrapped = ServiceCacheListenerWrapper.<T>wrap(listener); - listenerContainer.addListener(wrapped, executor); - discovery.getClient().getConnectionStateListenable().addListener(wrapped, executor); + listenerContainer.addListener(listener, executor); + discovery.getClient().getConnectionStateListenable().addListener(listener, executor); } @Override - public void removeListener(final ServiceCacheListener listener) + public void removeListener(ServiceCacheListener listener) { - listenerContainer.forEach - ( - new Function<ServiceCacheEventListener<T>, Void>() - { - @Override - public Void apply(ServiceCacheEventListener<T> eventListener) - { - if ( Objects.equals(ServiceCacheListenerWrapper.unwrap(eventListener), listener) ) - { - listenerContainer.removeListener(eventListener); - discovery.getClient().getConnectionStateListenable().removeListener(eventListener); - } - return null; - } - } - ); + listenerContainer.removeListener(listener); + discovery.getClient().getConnectionStateListenable().removeListener(listener); } @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + boolean notifyListeners = false; switch ( event.getType() ) { case CHILD_ADDED: + case CHILD_UPDATED: { - final Tuple<T> tuple = addOrUpdateInstance(event.getData()); - listenerContainer.forEach - ( - new Function<ServiceCacheEventListener<T>, Void>() - { - @Override - public Void apply(ServiceCacheEventListener<T> listener) - { - listener.cacheAdded(tuple.newInstance); - return null; - } - } - ); + notifyListeners = true; + applyTuple(addOrUpdateInstance(event.getData())); break; } - case CHILD_UPDATED: + case CHILD_REMOVED: { - final Tuple<T> tuple = addOrUpdateInstance(event.getData()); - listenerContainer.forEach - ( - new Function<ServiceCacheEventListener<T>, Void>() - { - @Override - public Void apply(ServiceCacheEventListener<T> listener) - { - if ( tuple.oldInstance != null ) - { - listener.cacheUpdated(tuple.oldInstance, tuple.newInstance); - } - else - { - listener.cacheAdded(tuple.newInstance); - } - return null; - } - } - ); + notifyListeners = true; + final ServiceInstance<T> serviceInstance = instances.remove(instanceIdFromData(event.getData())); + applyTuple(new Tuple<T>(serviceInstance, null)); break; } + } - case CHILD_REMOVED: + if ( notifyListeners ) + { + listenerContainer.forEach + ( + new Function<ServiceCacheListener, Void>() + { + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + return null; + } + } + ); + } + } + + private void applyTuple(final Tuple<T> tuple) + { + eventListenerContainer.forEach + ( + new Function<ServiceCacheEventListener<T>, Void>() { - final ServiceInstance<T> serviceInstance = instances.remove(instanceIdFromData(event.getData())); - if ( serviceInstance != null ) + @Override + public Void apply(ServiceCacheEventListener<T> listener) { - listenerContainer.forEach - ( - new Function<ServiceCacheEventListener<T>, Void>() + if ( tuple.oldInstance != null ) + { + if ( tuple.newInstance != null ) + { + listener.cacheUpdated(tuple.oldInstance, tuple.newInstance); + } + else { - @Override - public Void apply(ServiceCacheEventListener<T> listener) - { - listener.cacheDeleted(serviceInstance); - return null; - } + listener.cacheDeleted(tuple.oldInstance); } - ); + } + else if ( tuple.newInstance != null ) + { + listener.cacheAdded(tuple.newInstance); + } + return null; } - break; } - } + ); } private String instanceIdFromData(ChildData childData) @@ -282,8 +266,8 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi } private static class Tuple<T> { - public final ServiceInstance<T> oldInstance; - public final ServiceInstance<T> newInstance; + private final ServiceInstance<T> oldInstance; + private final ServiceInstance<T> newInstance; private Tuple(final ServiceInstance<T> oldInstance, final ServiceInstance<T> newInstance) { this.oldInstance = oldInstance; http://git-wip-us.apache.org/repos/asf/curator/blob/c0c0ecad/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java deleted file mode 100644 index 6f14178..0000000 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheListenerWrapper.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.apache.curator.x.discovery.details; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.x.discovery.ServiceInstance; - -class ServiceCacheListenerWrapper<T> implements ServiceCacheEventListener<T> -{ - private final ServiceCacheListener listener; - - static <T> ServiceCacheListenerWrapper<T> wrap(ServiceCacheListener listener) - { - return new ServiceCacheListenerWrapper<>(listener); - } - - static ServiceCacheListener unwrap(ServiceCacheEventListener<?> eventListener) - { - if ( eventListener instanceof ServiceCacheListenerWrapper ) - { - return ((ServiceCacheListenerWrapper)eventListener).listener; - } - return null; - } - - private ServiceCacheListenerWrapper(ServiceCacheListener listener) - { - this.listener = listener; - } - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - listener.stateChanged(client, newState); - } - - @Override - public void cacheAdded(ServiceInstance<T> added) - { - listener.cacheChanged(); - } - - @Override - public void cacheDeleted(ServiceInstance<T> deleted) - { - listener.cacheChanged(); - } - - @Override - public void cacheUpdated(ServiceInstance<T> old, ServiceInstance<T> updated) - { - listener.cacheChanged(); - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/c0c0ecad/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index 70b41e3..509df7b 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -44,6 +44,8 @@ import java.util.concurrent.atomic.AtomicBoolean; public class TestServiceCache extends BaseClassForTests { + private final Timing timing = new Timing(); + @Test public void testInitialLoad() throws Exception { @@ -106,8 +108,6 @@ public class TestServiceCache extends BaseClassForTests @Test public void testViaProvider() throws Exception { - Timing timing = new Timing(); - List<Closeable> closeables = Lists.newArrayList(); try { @@ -330,36 +330,33 @@ public class TestServiceCache extends BaseClassForTests ServiceCache<String> cache = discovery.serviceCacheBuilder().name("test").build(); closeables.add(cache); - final CountDownLatch latch = new CountDownLatch(3); - - final AtomicBoolean notifyError = new AtomicBoolean(false); + final Semaphore latch = new Semaphore(0); + final AtomicBoolean validation = new AtomicBoolean(true); ServiceCacheEventListener<String> listener = new ServiceCacheEventListener<String>() { @Override - public void cacheAdded(final ServiceInstance<String> added) { - latch.countDown(); - - notifyError.compareAndSet(false,added == null); - } - - @Override - public void cacheDeleted(final ServiceInstance<String> deleted) { - latch.countDown(); + public void cacheAdded(final ServiceInstance<String> added) + { + latch.release(); - notifyError.compareAndSet(false,deleted == null); + validation.compareAndSet(true,added != null); } @Override - public void cacheUpdated(final ServiceInstance<String> before, final ServiceInstance<String> after) { - latch.countDown(); + public void cacheDeleted(final ServiceInstance<String> deleted) + { + latch.release(); - notifyError.compareAndSet(false, !"before".equals(before.getPayload())); - notifyError.compareAndSet(false, !"after".equals(after.getPayload())); + validation.compareAndSet(true,deleted != null); } @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) + public void cacheUpdated(final ServiceInstance<String> before, final ServiceInstance<String> after) { + latch.release(); + + validation.compareAndSet(true, "before".equals(before.getPayload())); + validation.compareAndSet(true, "after".equals(after.getPayload())); } }; cache.getCacheEventListenable().addListener(listener); @@ -367,12 +364,14 @@ public class TestServiceCache extends BaseClassForTests ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("before").name("test").port(10064).build(); discovery.registerService(instance); + Assert.assertTrue(timing.acquireSemaphore(latch)); instance = ServiceInstance.<String>builder().id(instance.getId()).payload("after").name("test").port(10064).build(); discovery.updateService(instance); + Assert.assertTrue(timing.acquireSemaphore(latch)); discovery.unregisterService(instance); - Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + Assert.assertTrue(timing.acquireSemaphore(latch)); - Assert.assertFalse(notifyError.get()); + Assert.assertTrue(validation.get()); } finally {