Added a way to watch registered services so that tools such as admin consoles can change values and have SD recognize the changes
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/37dc4478 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/37dc4478 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/37dc4478 Branch: refs/heads/master Commit: 37dc4478597c6db0dcab83b636318b51bb389c58 Parents: 742e092 Author: randgalt <randg...@apache.org> Authored: Sun Jan 4 17:03:29 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Sun Jan 4 17:03:29 2015 -0500 ---------------------------------------------------------------------- .../x/discovery/ServiceDiscoveryBuilder.java | 45 +++++++---- .../discovery/details/ServiceDiscoveryImpl.java | 59 +++++++++++++-- .../x/discovery/TestServiceDiscovery.java | 79 ++++++++++---------- .../discovery/details/TestWatchedInstances.java | 76 +++++++++++++++++++ 4 files changed, 200 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java index 2b972ca..e25fc67 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceDiscoveryBuilder.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.x.discovery; import org.apache.curator.framework.CuratorFramework; @@ -25,20 +26,21 @@ import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl; public class ServiceDiscoveryBuilder<T> { - private CuratorFramework client; - private String basePath; - private InstanceSerializer<T> serializer; - private ServiceInstance<T> thisInstance; - private Class<T> payloadClass; + private CuratorFramework client; + private String basePath; + private InstanceSerializer<T> serializer; + private ServiceInstance<T> thisInstance; + private Class<T> payloadClass; + private boolean watchInstances = false; /** * Return a new builder. * * @param payloadClass the class of the payload of your service instance (you can use {@link Void} - * if your instances don't need a payload) + * if your instances don't need a payload) * @return new builder */ - public static<T> ServiceDiscoveryBuilder<T> builder(Class<T> payloadClass) + public static <T> ServiceDiscoveryBuilder<T> builder(Class<T> payloadClass) { return new ServiceDiscoveryBuilder<T>(payloadClass); } @@ -49,12 +51,13 @@ public class ServiceDiscoveryBuilder<T> * * @return new service discovery */ - public ServiceDiscovery<T> build() + public ServiceDiscovery<T> build() { - if ( serializer == null ) { + if ( serializer == null ) + { serializer(new JsonInstanceSerializer<T>(payloadClass)); } - return new ServiceDiscoveryImpl<T>(client, basePath, serializer, thisInstance); + return new ServiceDiscoveryImpl<T>(client, basePath, serializer, thisInstance, watchInstances); } /** @@ -63,7 +66,7 @@ public class ServiceDiscoveryBuilder<T> * @param client client * @return this */ - public ServiceDiscoveryBuilder<T> client(CuratorFramework client) + public ServiceDiscoveryBuilder<T> client(CuratorFramework client) { this.client = client; return this; @@ -75,7 +78,7 @@ public class ServiceDiscoveryBuilder<T> * @param basePath base path * @return this */ - public ServiceDiscoveryBuilder<T> basePath(String basePath) + public ServiceDiscoveryBuilder<T> basePath(String basePath) { this.basePath = basePath; return this; @@ -87,7 +90,7 @@ public class ServiceDiscoveryBuilder<T> * @param serializer the serializer * @return this */ - public ServiceDiscoveryBuilder<T> serializer(InstanceSerializer<T> serializer) + public ServiceDiscoveryBuilder<T> serializer(InstanceSerializer<T> serializer) { this.serializer = serializer; return this; @@ -99,12 +102,26 @@ public class ServiceDiscoveryBuilder<T> * @param thisInstance initial instance * @return this */ - public ServiceDiscoveryBuilder<T> thisInstance(ServiceInstance<T> thisInstance) + public ServiceDiscoveryBuilder<T> thisInstance(ServiceInstance<T> thisInstance) { this.thisInstance = thisInstance; return this; } + /** + * Optional - if true, watches for changes to locally registered instances + * (via {@link #thisInstance(ServiceInstance)} or {@link ServiceDiscovery#registerService(ServiceInstance)}). + * If the data for instances changes, they are reloaded. + * + * @param watchInstances true to watch instances + * @return this + */ + public ServiceDiscoveryBuilder<T> watchInstances(boolean watchInstances) + { + this.watchInstances = watchInstances; + return this; + } + ServiceDiscoveryBuilder(Class<T> payloadClass) { this.payloadClass = payloadClass; http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index a55f678..ca8eabe 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -24,11 +24,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; - -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.discovery.ServiceCache; @@ -41,12 +44,11 @@ import org.apache.curator.x.discovery.ServiceType; import org.apache.curator.x.discovery.strategies.RoundRobinStrategy; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -64,6 +66,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> private final Map<String, ServiceInstance<T>> services = Maps.newConcurrentMap(); private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap()); private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap()); + private final boolean watchInstances; private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override @@ -89,9 +92,11 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> * @param basePath base path to store data * @param serializer serializer for instances (e.g. {@link JsonInstanceSerializer}) * @param thisInstance instance that represents the service that is running. The instance will get auto-registered + * @param watchInstances if true, watches for changes to locally registered instances */ - public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance) + public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance, boolean watchInstances) { + this.watchInstances = watchInstances; this.client = Preconditions.checkNotNull(client, "client cannot be null"); this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null"); this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null"); @@ -192,6 +197,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> { CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes); + if ( watchInstances ) + { + resetWatchedInstance(service); + } isDone = true; } catch ( KeeperException.NodeExistsException e ) @@ -365,6 +374,37 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> return builder.build(); } + private void resetWatchedInstance(final ServiceInstance<T> service) throws Exception + { + CuratorWatcher watcher = new CuratorWatcher() + { + @Override + public void process(WatchedEvent event) throws Exception + { + if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) + { + resetWatchedInstance(service); + } + } + }; + + BackgroundCallback callback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + if ( event.getType() == CuratorEventType.GET_DATA ) + { + ServiceInstance<T> newInstance = serializer.deserialize(event.getData()); + services.put(newInstance.getId(), newInstance); + } + } + }; + + String path = pathForInstance(service.getName(), service.getId()); + client.getData().usingWatcher(watcher).inBackground(callback).forPath(path); + } + private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception { List<String> instanceIds; @@ -394,11 +434,18 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> return instanceIds; } - private String pathForInstance(String name, String id) throws UnsupportedEncodingException + @VisibleForTesting + String pathForInstance(String name, String id) { return ZKPaths.makePath(pathForName(name), id); } + @VisibleForTesting + ServiceInstance<T> getRegisteredService(String id) + { + return services.get(id); + } + private void reRegisterServices() throws Exception { for ( ServiceInstance<T> service : services.values() ) http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java index 73de7fc..0465599 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java @@ -16,17 +16,18 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.x.discovery; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.KillSession; import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl; import org.testng.Assert; @@ -40,7 +41,7 @@ import java.util.concurrent.Semaphore; public class TestServiceDiscovery extends BaseClassForTests { - private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>() + private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>() { @Override public int compare(ServiceInstance<Void> o1, ServiceInstance<Void> o2) @@ -50,20 +51,20 @@ public class TestServiceDiscovery extends BaseClassForTests }; @Test - public void testCrashedServerMultiInstances() throws Exception + public void testCrashedServerMultiInstances() throws Exception { - List<Closeable> closeables = Lists.newArrayList(); + List<Closeable> closeables = Lists.newArrayList(); try { - Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); closeables.add(client); client.start(); - final Semaphore semaphore = new Semaphore(0); - ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); - ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build(); - ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1) + final Semaphore semaphore = new Semaphore(0); + ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); + ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build(); + ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance1, false) { @Override protected void internalRegisterService(ServiceInstance<String> service) throws Exception @@ -98,19 +99,19 @@ public class TestServiceDiscovery extends BaseClassForTests } @Test - public void testCrashedServer() throws Exception + public void testCrashedServer() throws Exception { - List<Closeable> closeables = Lists.newArrayList(); + List<Closeable> closeables = Lists.newArrayList(); try { - Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); closeables.add(client); client.start(); - final Semaphore semaphore = new Semaphore(0); - ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); - ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance) + final Semaphore semaphore = new Semaphore(0); + ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); + ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false) { @Override protected void internalRegisterService(ServiceInstance<String> service) throws Exception @@ -144,24 +145,24 @@ public class TestServiceDiscovery extends BaseClassForTests } @Test - public void testCrashedInstance() throws Exception + public void testCrashedInstance() throws Exception { - List<Closeable> closeables = Lists.newArrayList(); + List<Closeable> closeables = Lists.newArrayList(); try { - Timing timing = new Timing(); + Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); closeables.add(client); client.start(); - ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); - ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance); + ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); + ServiceDiscovery<String> discovery = new ServiceDiscoveryImpl<String>(client, "/test", new JsonInstanceSerializer<String>(String.class), instance, false); closeables.add(discovery); discovery.start(); Assert.assertEquals(discovery.queryForInstances("test").size(), 1); - + KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString()); Thread.sleep(timing.multiple(1.5).session()); @@ -178,24 +179,24 @@ public class TestServiceDiscovery extends BaseClassForTests } @Test - public void testMultipleInstances() throws Exception + public void testMultipleInstances() throws Exception { - final String SERVICE_ONE = "one"; - final String SERVICE_TWO = "two"; + final String SERVICE_ONE = "one"; + final String SERVICE_TWO = "two"; - List<Closeable> closeables = Lists.newArrayList(); + List<Closeable> closeables = Lists.newArrayList(); try { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); closeables.add(client); client.start(); - ServiceInstance<Void> s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build(); - ServiceInstance<Void> s1_i2 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build(); - ServiceInstance<Void> s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build(); - ServiceInstance<Void> s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build(); + ServiceInstance<Void> s1_i1 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build(); + ServiceInstance<Void> s1_i2 = ServiceInstance.<Void>builder().name(SERVICE_ONE).build(); + ServiceInstance<Void> s2_i1 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build(); + ServiceInstance<Void> s2_i2 = ServiceInstance.<Void>builder().name(SERVICE_TWO).build(); - ServiceDiscovery<Void> discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build(); + ServiceDiscovery<Void> discovery = ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build(); closeables.add(discovery); discovery.start(); @@ -234,17 +235,17 @@ public class TestServiceDiscovery extends BaseClassForTests } @Test - public void testBasic() throws Exception + public void testBasic() throws Exception { - List<Closeable> closeables = Lists.newArrayList(); + List<Closeable> closeables = Lists.newArrayList(); try { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); closeables.add(client); client.start(); - - ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); - ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build(); + + ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); + ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build(); closeables.add(discovery); discovery.start(); http://git-wip-us.apache.org/repos/asf/curator/blob/37dc4478/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java new file mode 100644 index 0000000..0a19b41 --- /dev/null +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java @@ -0,0 +1,76 @@ +package org.apache.curator.x.discovery.details; + +import com.google.common.collect.Lists; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.io.Closeable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class TestWatchedInstances extends BaseClassForTests +{ + @Test + public void testWatchedInstances() throws Exception + { + Timing timing = new Timing(); + List<Closeable> closeables = Lists.newArrayList(); + try + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); + ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder + .builder(String.class) + .basePath("/test") + .client(client) + .thisInstance(instance) + .watchInstances(true) + .build(); + closeables.add(discovery); + discovery.start(); + + Assert.assertEquals(discovery.queryForNames(), Arrays.asList("test")); + + List<ServiceInstance<String>> list = Lists.newArrayList(); + list.add(instance); + Assert.assertEquals(discovery.queryForInstances("test"), list); + + ServiceDiscoveryImpl<String> discoveryImpl = (ServiceDiscoveryImpl<String>)discovery; + ServiceInstance<String> changedInstance = ServiceInstance.<String>builder() + .id(instance.getId()) + .address(instance.getAddress()) + .payload("different") + .name(instance.getName()) + .port(instance.getPort()) + .build(); + String path = discoveryImpl.pathForInstance("test", instance.getId()); + byte[] bytes = discoveryImpl.getSerializer().serialize(changedInstance); + client.setData().forPath(path, bytes); + timing.sleepABit(); + + ServiceInstance<String> registeredService = discoveryImpl.getRegisteredService(instance.getId()); + Assert.assertNotNull(registeredService); + Assert.assertEquals(registeredService.getPayload(), "different"); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } +}