Repository: curator Updated Branches: refs/heads/CURATOR-164 [created] 915d83add
when unregistering a service remove it from the internal map first and then delete (guaranteed) the node Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/915d83ad Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/915d83ad Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/915d83ad Branch: refs/heads/CURATOR-164 Commit: 915d83add911d624ab3584508f566344827fbae6 Parents: c65e091 Author: randgalt <randg...@apache.org> Authored: Tue Apr 21 12:31:17 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Tue Apr 21 12:31:17 2015 -0500 ---------------------------------------------------------------------- .../discovery/details/ServiceDiscoveryImpl.java | 62 ++++++++-------- .../x/discovery/TestServiceDiscovery.java | 74 +++++++++++++++++++- 2 files changed, 103 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/915d83ad/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index 41c5d77..824eb75 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.x.discovery.details; import com.google.common.annotations.VisibleForTesting; @@ -149,7 +150,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> ServiceInstance<T> service = it.next(); String path = pathForInstance(service.getName(), service.getId()); boolean doRemove = true; - + try { client.delete().forPath(path); @@ -163,13 +164,13 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> doRemove = false; log.error("Could not unregister instance: " + service.getName(), e); } - + if ( doRemove ) { it.remove(); } } - + client.getConnectionStateListenable().removeListener(connectionStateListener); } @@ -189,25 +190,25 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> @Override public void updateService(ServiceInstance<T> service) throws Exception { - byte[] bytes = serializer.serialize(service); - String path = pathForInstance(service.getName(), service.getId()); + byte[] bytes = serializer.serialize(service); + String path = pathForInstance(service.getName(), service.getId()); client.setData().forPath(path, bytes); services.put(service.getId(), service); } @VisibleForTesting - protected void internalRegisterService(ServiceInstance<T> service) throws Exception + protected void internalRegisterService(ServiceInstance<T> service) throws Exception { - byte[] bytes = serializer.serialize(service); - String path = pathForInstance(service.getName(), service.getId()); + byte[] bytes = serializer.serialize(service); + String path = pathForInstance(service.getName(), service.getId()); - final int MAX_TRIES = 2; - boolean isDone = false; + final int MAX_TRIES = 2; + boolean isDone = false; for ( int i = 0; !isDone && (i < MAX_TRIES); ++i ) { try { - CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; + CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes); isDone = true; } @@ -225,18 +226,19 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> * @throws Exception errors */ @Override - public void unregisterService(ServiceInstance<T> service) throws Exception + public void unregisterService(ServiceInstance<T> service) throws Exception { - String path = pathForInstance(service.getName(), service.getId()); + services.remove(service.getId()); + + String path = pathForInstance(service.getName(), service.getId()); try { - client.delete().forPath(path); + client.delete().guaranteed().forPath(path); } catch ( KeeperException.NoNodeException ignore ) { // ignore } - services.remove(service.getId()); } /** @@ -271,9 +273,9 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> * @throws Exception errors */ @Override - public Collection<String> queryForNames() throws Exception + public Collection<String> queryForNames() throws Exception { - List<String> names = client.getChildren().forPath(basePath); + List<String> names = client.getChildren().forPath(basePath); return ImmutableList.copyOf(names); } @@ -285,7 +287,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> * @throws Exception errors */ @Override - public Collection<ServiceInstance<T>> queryForInstances(String name) throws Exception + public Collection<ServiceInstance<T>> queryForInstances(String name) throws Exception { return queryForInstances(name, null); } @@ -301,10 +303,10 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> @Override public ServiceInstance<T> queryForInstance(String name, String id) throws Exception { - String path = pathForInstance(name, id); + String path = pathForInstance(name, id); try { - byte[] bytes = client.getData().forPath(path); + byte[] bytes = client.getData().forPath(path); return serializer.deserialize(bytes); } catch ( KeeperException.NoNodeException ignore ) @@ -314,22 +316,22 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> return null; } - void cacheOpened(ServiceCache<T> cache) + void cacheOpened(ServiceCache<T> cache) { caches.add(cache); } - void cacheClosed(ServiceCache<T> cache) + void cacheClosed(ServiceCache<T> cache) { caches.remove(cache); } - void providerOpened(ServiceProvider<T> provider) + void providerOpened(ServiceProvider<T> provider) { providers.add(provider); } - void providerClosed(ServiceProvider<T> cache) + void providerClosed(ServiceProvider<T> cache) { providers.remove(cache); } @@ -339,7 +341,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> return client; } - String pathForName(String name) + String pathForName(String name) { return ZKPaths.makePath(basePath, name); } @@ -349,11 +351,11 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> return serializer; } - List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher) throws Exception + List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher) throws Exception { - ImmutableList.Builder<ServiceInstance<T>> builder = ImmutableList.builder(); - String path = pathForName(name); - List<String> instanceIds; + ImmutableList.Builder<ServiceInstance<T>> builder = ImmutableList.builder(); + String path = pathForName(name); + List<String> instanceIds; if ( watcher != null ) { @@ -384,7 +386,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception { - List<String> instanceIds; + List<String> instanceIds; try { instanceIds = client.getChildren().usingWatcher(watcher).forPath(path); http://git-wip-us.apache.org/repos/asf/curator/blob/915d83ad/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java index 6eb9ebb..40d491a 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java @@ -28,6 +28,7 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.KillSession; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.details.InstanceSerializer; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl; import org.testng.Assert; @@ -37,7 +38,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; public class TestServiceDiscovery extends BaseClassForTests { @@ -269,15 +272,15 @@ public class TestServiceDiscovery extends BaseClassForTests public void testNoServerOnStart() throws Exception { server.stop(); - List<Closeable> closeables = Lists.newArrayList(); + List<Closeable> closeables = Lists.newArrayList(); try { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); closeables.add(client); client.start(); - ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); - ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build(); + ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); + ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build(); closeables.add(discovery); discovery.start(); @@ -297,4 +300,69 @@ public class TestServiceDiscovery extends BaseClassForTests } } } + + // CURATOR-164 + @Test + public void testUnregisterService() throws Exception + { + final String name = "name"; + + final CountDownLatch restartLatch = new CountDownLatch(1); + List<Closeable> closeables = Lists.newArrayList(); + + InstanceSerializer<String> slowSerializer = new JsonInstanceSerializer<String>(String.class) + { + private boolean first = true; + + @Override + public byte[] serialize(ServiceInstance<String> instance) throws Exception + { + if ( first ) + { + System.out.println("Serializer first registration."); + first = false; + } + else + { + System.out.println("Waiting for reconnect to finish."); + // Simulate the serialize method being slow. + // This could just be a timed wait, but that's kind of non-deterministic. + restartLatch.await(); + } + return super.serialize(instance); + } + }; + + try + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build(); + ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).build(); + closeables.add(discovery); + discovery.start(); + + Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), "Service should start registered."); + + server.stop(); + server.restart(); + + discovery.unregisterService(instance); + restartLatch.countDown(); + + TimeUnit.SECONDS.sleep(1); // Wait for the rest of registration to finish. + + Assert.assertTrue(discovery.queryForInstances(name).isEmpty(), "Service should have unregistered."); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } }