Repository: curator Updated Branches: refs/heads/CURATOR-470 [created] c0c0ecad5
Add new service cache listener to let users know what instances have changed, while preserving backward compatibility. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/88e8d9a1 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/88e8d9a1 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/88e8d9a1 Branch: refs/heads/CURATOR-470 Commit: 88e8d9a15d4f1690a6df1c1cabb355936f327aaa Parents: 9383aa3 Author: Qin Meijie <meijie....@eques.com.cn> Authored: Fri Jun 8 13:06:15 2018 +0800 Committer: Qin Meijie <meijie....@eques.com.cn> Committed: Fri Jun 8 13:06:15 2018 +0800 ---------------------------------------------------------------------- .../details/ServiceCacheEventListener.java | 50 +++++++++ .../x/discovery/details/ServiceCacheImpl.java | 109 +++++++++++++------ .../curator/x/discovery/TestServiceCache.java | 80 ++++++++++++++ 3 files changed, 207 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/88e8d9a1/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java new file mode 100644 index 0000000..1f783f3 --- /dev/null +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.discovery.details; + +import org.apache.curator.x.discovery.ServiceInstance; + +/** + * Listener for events (addition/update/deletion) that happen to a service cache + */ +public interface ServiceCacheEventListener<T> extends ServiceCacheListener +{ + + /** + * Called when a new cache is added. + * + * @param added instance added + */ + public void cacheAdded(ServiceInstance<T> added); + + /** + * Called when a cache is deleted. + * + * @param deleted instance deleted + */ + public void cacheDeleted(ServiceInstance<T> deleted); + + /** + * Called when a cache is updated. + * + * @param old old instance + * @param updated updated instance + */ + public void cacheUpdated(ServiceInstance<T> old, ServiceInstance<T> updated); +} http://git-wip-us.apache.org/repos/asf/curator/blob/88e8d9a1/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index b8f39d5..df6696a 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -93,7 +93,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi cache.start(true); for ( ChildData childData : cache.getCurrentData() ) { - addInstance(childData, true); + addInstanceOnlyIfAbsent(childData); } discovery.cacheOpened(this); } @@ -146,40 +146,72 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - boolean notifyListeners = false; - switch ( event.getType() ) + final Tuple<T> tuple; + switch ( event.getType() ) { case CHILD_ADDED: + tuple = addOrUpdateInstance(event.getData()); + listenerContainer.forEach( + new Function<ServiceCacheListener, Void>() + { + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + + if(listener instanceof ServiceCacheEventListener) { + ((ServiceCacheEventListener) listener).cacheAdded(tuple.newInstance); + } + + return null; + } + } + ); + break; case CHILD_UPDATED: { - addInstance(event.getData(), false); - notifyListeners = true; + tuple = addOrUpdateInstance(event.getData()); + listenerContainer.forEach( + new Function<ServiceCacheListener, Void>() + { + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + + if(listener instanceof ServiceCacheEventListener) { + ((ServiceCacheEventListener) listener).cacheUpdated(tuple.oldInstance, tuple.newInstance); + } + + return null; + } + } + ); break; } case CHILD_REMOVED: { - instances.remove(instanceIdFromData(event.getData())); - notifyListeners = true; + final ServiceInstance<T> serviceInstance = instances.remove(instanceIdFromData(event.getData())); + listenerContainer.forEach( + new Function<ServiceCacheListener, Void>() + { + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + + if(listener instanceof ServiceCacheEventListener) { + ((ServiceCacheEventListener) listener).cacheDeleted(serviceInstance); + } + + return null; + } + } + ); break; } } - - if ( notifyListeners ) - { - listenerContainer.forEach - ( - new Function<ServiceCacheListener, Void>() - { - @Override - public Void apply(ServiceCacheListener listener) - { - listener.cacheChanged(); - return null; - } - } - ); - } } private String instanceIdFromData(ChildData childData) @@ -187,18 +219,31 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi return ZKPaths.getNodeFromPath(childData.getPath()); } - private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception + private void addInstanceOnlyIfAbsent(ChildData childData) throws Exception { String instanceId = instanceIdFromData(childData); ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData()); - if ( onlyIfAbsent ) - { - instances.putIfAbsent(instanceId, serviceInstance); - } - else - { - instances.put(instanceId, serviceInstance); - } + + instances.putIfAbsent(instanceId, serviceInstance); + cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); + } + + private Tuple<T> addOrUpdateInstance(ChildData childData) throws Exception + { + String instanceId = instanceIdFromData(childData); + ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData()); + final Tuple<T> result = new Tuple<>(instances.put(instanceId, serviceInstance), serviceInstance); cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); + return result; + } + + private static class Tuple<T> { + public final ServiceInstance<T> oldInstance; + public final ServiceInstance<T> newInstance; + + private Tuple(final ServiceInstance<T> oldInstance, final ServiceInstance<T> newInstance) { + this.oldInstance = oldInstance; + this.newInstance = newInstance; + } } } http://git-wip-us.apache.org/repos/asf/curator/blob/88e8d9a1/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index fda5c26..65b7cb9 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -28,6 +28,7 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.details.ServiceCacheEventListener; import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.testng.Assert; import org.testng.annotations.Test; @@ -39,6 +40,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class TestServiceCache extends BaseClassForTests { @@ -310,4 +312,82 @@ public class TestServiceCache extends BaseClassForTests } } } + + @Test + public void testServiceCacheEventListener() throws Exception + { + List<Closeable> closeables = Lists.newArrayList(); + try + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build(); + closeables.add(discovery); + discovery.start(); + + ServiceCache<String> cache = discovery.serviceCacheBuilder().name("test").build(); + closeables.add(cache); + + final CountDownLatch latch = new CountDownLatch(6); + + final AtomicBoolean notifyError = new AtomicBoolean(false); + ServiceCacheListener listener = new ServiceCacheEventListener<String>() + { + @Override + public void cacheAdded(final ServiceInstance<String> added) { + latch.countDown(); + + notifyError.compareAndSet(false,added == null); + } + + @Override + public void cacheDeleted(final ServiceInstance<String> deleted) { + latch.countDown(); + + notifyError.compareAndSet(false,deleted == null); + } + + @Override + public void cacheUpdated(final ServiceInstance<String> before, final ServiceInstance<String> after) { + latch.countDown(); + + notifyError.compareAndSet(false, !"before".equals(before.getPayload())); + notifyError.compareAndSet(false, !"after".equals(after.getPayload())); + } + + @Override + public void cacheChanged() + { + latch.countDown(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + + } + }; + cache.addListener(listener); + cache.start(); + + ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("before").name("test").port(10064).build(); + discovery.registerService(instance); + instance = ServiceInstance.<String>builder().id(instance.getId()).payload("after").name("test").port(10064).build(); + discovery.updateService(instance); + discovery.unregisterService(instance); + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + + Assert.assertFalse(notifyError.get()); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } }