when unregistering a service remove it from the internal map first and then 
delete (guaranteed) the node


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/915d83ad
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/915d83ad
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/915d83ad

Branch: refs/heads/CURATOR-3.0
Commit: 915d83add911d624ab3584508f566344827fbae6
Parents: c65e091
Author: randgalt <randg...@apache.org>
Authored: Tue Apr 21 12:31:17 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Tue Apr 21 12:31:17 2015 -0500

----------------------------------------------------------------------
 .../discovery/details/ServiceDiscoveryImpl.java | 62 ++++++++--------
 .../x/discovery/TestServiceDiscovery.java       | 74 +++++++++++++++++++-
 2 files changed, 103 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/915d83ad/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 41c5d77..824eb75 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -149,7 +150,7 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
             ServiceInstance<T> service = it.next();
             String path = pathForInstance(service.getName(), service.getId());
             boolean doRemove = true;
-            
+
             try
             {
                 client.delete().forPath(path);
@@ -163,13 +164,13 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
                 doRemove = false;
                 log.error("Could not unregister instance: " + 
service.getName(), e);
             }
-            
+
             if ( doRemove )
             {
                 it.remove();
             }
         }
-        
+
         
client.getConnectionStateListenable().removeListener(connectionStateListener);
     }
 
@@ -189,25 +190,25 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public void updateService(ServiceInstance<T> service) throws Exception
     {
-        byte[]          bytes = serializer.serialize(service);
-        String          path = pathForInstance(service.getName(), 
service.getId());
+        byte[] bytes = serializer.serialize(service);
+        String path = pathForInstance(service.getName(), service.getId());
         client.setData().forPath(path, bytes);
         services.put(service.getId(), service);
     }
 
     @VisibleForTesting
-    protected void     internalRegisterService(ServiceInstance<T> service) 
throws Exception
+    protected void internalRegisterService(ServiceInstance<T> service) throws 
Exception
     {
-        byte[]          bytes = serializer.serialize(service);
-        String          path = pathForInstance(service.getName(), 
service.getId());
+        byte[] bytes = serializer.serialize(service);
+        String path = pathForInstance(service.getName(), service.getId());
 
-        final int       MAX_TRIES = 2;
-        boolean         isDone = false;
+        final int MAX_TRIES = 2;
+        boolean isDone = false;
         for ( int i = 0; !isDone && (i < MAX_TRIES); ++i )
         {
             try
             {
-                CreateMode      mode = (service.getServiceType() == 
ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
+                CreateMode mode = (service.getServiceType() == 
ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
                 
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes);
                 isDone = true;
             }
@@ -225,18 +226,19 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
      * @throws Exception errors
      */
     @Override
-    public void     unregisterService(ServiceInstance<T> service) throws 
Exception
+    public void unregisterService(ServiceInstance<T> service) throws Exception
     {
-        String          path = pathForInstance(service.getName(), 
service.getId());
+        services.remove(service.getId());
+
+        String path = pathForInstance(service.getName(), service.getId());
         try
         {
-            client.delete().forPath(path);
+            client.delete().guaranteed().forPath(path);
         }
         catch ( KeeperException.NoNodeException ignore )
         {
             // ignore
         }
-        services.remove(service.getId());
     }
 
     /**
@@ -271,9 +273,9 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
      * @throws Exception errors
      */
     @Override
-    public Collection<String>   queryForNames() throws Exception
+    public Collection<String> queryForNames() throws Exception
     {
-        List<String>        names = client.getChildren().forPath(basePath);
+        List<String> names = client.getChildren().forPath(basePath);
         return ImmutableList.copyOf(names);
     }
 
@@ -285,7 +287,7 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
      * @throws Exception errors
      */
     @Override
-    public Collection<ServiceInstance<T>>  queryForInstances(String name) 
throws Exception
+    public Collection<ServiceInstance<T>> queryForInstances(String name) 
throws Exception
     {
         return queryForInstances(name, null);
     }
@@ -301,10 +303,10 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public ServiceInstance<T> queryForInstance(String name, String id) throws 
Exception
     {
-        String          path = pathForInstance(name, id);
+        String path = pathForInstance(name, id);
         try
         {
-            byte[]          bytes = client.getData().forPath(path);
+            byte[] bytes = client.getData().forPath(path);
             return serializer.deserialize(bytes);
         }
         catch ( KeeperException.NoNodeException ignore )
@@ -314,22 +316,22 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
         return null;
     }
 
-    void    cacheOpened(ServiceCache<T> cache)
+    void cacheOpened(ServiceCache<T> cache)
     {
         caches.add(cache);
     }
 
-    void    cacheClosed(ServiceCache<T> cache)
+    void cacheClosed(ServiceCache<T> cache)
     {
         caches.remove(cache);
     }
 
-    void    providerOpened(ServiceProvider<T> provider)
+    void providerOpened(ServiceProvider<T> provider)
     {
         providers.add(provider);
     }
 
-    void    providerClosed(ServiceProvider<T> cache)
+    void providerClosed(ServiceProvider<T> cache)
     {
         providers.remove(cache);
     }
@@ -339,7 +341,7 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
         return client;
     }
 
-    String  pathForName(String name)
+    String pathForName(String name)
     {
         return ZKPaths.makePath(basePath, name);
     }
@@ -349,11 +351,11 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
         return serializer;
     }
 
-    List<ServiceInstance<T>>  queryForInstances(String name, Watcher watcher) 
throws Exception
+    List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher) 
throws Exception
     {
-        ImmutableList.Builder<ServiceInstance<T>>   builder = 
ImmutableList.builder();
-        String                  path = pathForName(name);
-        List<String>            instanceIds;
+        ImmutableList.Builder<ServiceInstance<T>> builder = 
ImmutableList.builder();
+        String path = pathForName(name);
+        List<String> instanceIds;
 
         if ( watcher != null )
         {
@@ -384,7 +386,7 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
 
     private List<String> getChildrenWatched(String path, Watcher watcher, 
boolean recurse) throws Exception
     {
-        List<String>    instanceIds;
+        List<String> instanceIds;
         try
         {
             instanceIds = 
client.getChildren().usingWatcher(watcher).forPath(path);

http://git-wip-us.apache.org/repos/asf/curator/blob/915d83ad/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
index 6eb9ebb..40d491a 100644
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
@@ -28,6 +28,7 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.KillSession;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
 import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
 import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
 import org.testng.Assert;
@@ -37,7 +38,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 public class TestServiceDiscovery extends BaseClassForTests
 {
@@ -269,15 +272,15 @@ public class TestServiceDiscovery extends 
BaseClassForTests
     public void testNoServerOnStart() throws Exception
     {
         server.stop();
-        List<Closeable>     closeables = Lists.newArrayList();
+        List<Closeable> closeables = Lists.newArrayList();
         try
         {
             CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
             closeables.add(client);
             client.start();
 
-            ServiceInstance<String>     instance = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String>    discovery = 
ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceDiscovery<String> discovery = 
ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
             closeables.add(discovery);
             discovery.start();
 
@@ -297,4 +300,69 @@ public class TestServiceDiscovery extends BaseClassForTests
             }
         }
     }
+
+    // CURATOR-164
+    @Test
+    public void testUnregisterService() throws Exception
+    {
+        final String name = "name";
+
+        final CountDownLatch restartLatch = new CountDownLatch(1);
+        List<Closeable> closeables = Lists.newArrayList();
+
+        InstanceSerializer<String> slowSerializer = new 
JsonInstanceSerializer<String>(String.class)
+        {
+            private boolean first = true;
+
+            @Override
+            public byte[] serialize(ServiceInstance<String> instance) throws 
Exception
+            {
+                if ( first )
+                {
+                    System.out.println("Serializer first registration.");
+                    first = false;
+                }
+                else
+                {
+                    System.out.println("Waiting for reconnect to finish.");
+                    // Simulate the serialize method being slow.
+                    // This could just be a timed wait, but that's kind of 
non-deterministic.
+                    restartLatch.await();
+                }
+                return super.serialize(instance);
+            }
+        };
+
+        try
+        {
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build();
+            ServiceDiscovery<String> discovery = 
ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).build();
+            closeables.add(discovery);
+            discovery.start();
+
+            Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), 
"Service should start registered.");
+
+            server.stop();
+            server.restart();
+
+            discovery.unregisterService(instance);
+            restartLatch.countDown();
+
+            TimeUnit.SECONDS.sleep(1); // Wait for the rest of registration to 
finish.
+
+            Assert.assertTrue(discovery.queryForInstances(name).isEmpty(), 
"Service should have unregistered.");
+        }
+        finally
+        {
+            Collections.reverse(closeables);
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
 }

Reply via email to