Repository: curator
Updated Branches:
  refs/heads/CURATOR-164 0178c8303 -> f489dfebe


Concurrent registrations/unregistrations and connection issues can cause 
inconsistent state. Change to a model whereby 'unregistering' an instance 
doesn't remove it from management but changes the state. Instance will still be 
managed for a period of time and clean after a reasonable period


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f489dfeb
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f489dfeb
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f489dfeb

Branch: refs/heads/CURATOR-164
Commit: f489dfebeed4ecf004ded37a0f05a0a8a2dc7e6d
Parents: 0178c83
Author: randgalt <randg...@apache.org>
Authored: Tue Apr 21 15:13:26 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Tue Apr 21 15:13:26 2015 -0500

----------------------------------------------------------------------
 .../discovery/details/ServiceDiscoveryImpl.java | 190 ++++++---
 .../x/discovery/TestServiceDiscovery.java       | 368 -----------------
 .../discovery/details/TestServiceDiscovery.java | 402 +++++++++++++++++++
 3 files changed, 546 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/f489dfeb/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 824eb75..f53c7ce 100644
--- 
a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ 
b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -50,7 +50,10 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A mechanism to register and query service instances using ZooKeeper
@@ -61,10 +64,11 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     private final CuratorFramework client;
     private final String basePath;
     private final InstanceSerializer<T> serializer;
-    private final Map<String, ServiceInstance<T>> services = 
Maps.newConcurrentMap();
-    private final Map<String, NodeCache> watchedServices;
+    private final ConcurrentMap<String, Holder<T>> services = 
Maps.newConcurrentMap();
     private final Collection<ServiceCache<T>> caches = 
Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
     private final Collection<ServiceProvider<T>> providers = 
Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
+    private final boolean watchInstances;
+    private final AtomicLong lastCleanMs = new 
AtomicLong(System.currentTimeMillis());
     private final ConnectionStateListener connectionStateListener = new 
ConnectionStateListener()
     {
         @Override
@@ -85,6 +89,35 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
         }
     };
 
+    private static final int CLEAN_THRESHOLD_MS = 
Integer.getInteger("curator-discovery-clean-threshold-ms", 
(int)TimeUnit.MINUTES.toMillis(5));
+
+    private enum State
+    {
+        NEW,
+        REGISTERED,
+        UNREGISTERED
+    }
+
+    private static class Holder<T>
+    {
+        private final AtomicReference<ServiceInstance<T>> service = new 
AtomicReference<ServiceInstance<T>>();
+        private final AtomicReference<NodeCache> cache = new 
AtomicReference<NodeCache>();
+        private final AtomicReference<State> state = new 
AtomicReference<State>();
+        private final AtomicLong stateChangeMs = new AtomicLong();
+
+        public Holder(ServiceInstance<T> instance)
+        {
+            service.set(instance);
+            setState(State.NEW);
+        }
+
+        public void setState(State state)
+        {
+            this.state.set(state);
+            stateChangeMs.set(System.currentTimeMillis());
+        }
+    }
+
     /**
      * @param client the client
      * @param basePath base path to store data
@@ -94,10 +127,10 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
      */
     public ServiceDiscoveryImpl(CuratorFramework client, String basePath, 
InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance, boolean 
watchInstances)
     {
+        this.watchInstances = watchInstances;
         this.client = Preconditions.checkNotNull(client, "client cannot be 
null");
         this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot 
be null");
         this.serializer = Preconditions.checkNotNull(serializer, "serializer 
cannot be null");
-        watchedServices = watchInstances ? Maps.<String, 
NodeCache>newConcurrentMap() : null;
         if ( thisInstance != null )
         {
             setService(thisInstance);
@@ -134,26 +167,12 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
         {
             CloseableUtils.closeQuietly(provider);
         }
-        if ( watchedServices != null )
-        {
-            for ( NodeCache nodeCache : watchedServices.values() )
-            {
-                CloseableUtils.closeQuietly(nodeCache);
-            }
-        }
 
-        Iterator<ServiceInstance<T>> it = services.values().iterator();
-        while ( it.hasNext() )
+        for ( Holder<T> holder : services.values() )
         {
-            // Should not use unregisterService because of potential 
ConcurrentModificationException
-            // so we in-line the bulk of the method here
-            ServiceInstance<T> service = it.next();
-            String path = pathForInstance(service.getName(), service.getId());
-            boolean doRemove = true;
-
             try
             {
-                client.delete().forPath(path);
+                internalUnregisterService(holder);
             }
             catch ( KeeperException.NoNodeException ignore )
             {
@@ -161,13 +180,7 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
             }
             catch ( Exception e )
             {
-                doRemove = false;
-                log.error("Could not unregister instance: " + 
service.getName(), e);
-            }
-
-            if ( doRemove )
-            {
-                it.remove();
+                log.error("Could not unregister instance: " + 
holder.service.get().getName(), e);
             }
         }
 
@@ -183,6 +196,8 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public void registerService(ServiceInstance<T> service) throws Exception
     {
+        clean();
+
         setService(service);
         internalRegisterService(service);
     }
@@ -190,10 +205,18 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public void updateService(ServiceInstance<T> service) throws Exception
     {
+        clean();
+
+        Holder<T> holder = getOrMakeHolder(service, null);
+        if ( holder.state.get() == State.UNREGISTERED )
+        {
+            throw new Exception("Service has been unregistered: " + service);
+        }
+
+        holder.service.set(service);
         byte[] bytes = serializer.serialize(service);
         String path = pathForInstance(service.getName(), service.getId());
         client.setData().forPath(path, bytes);
-        services.put(service.getId(), service);
     }
 
     @VisibleForTesting
@@ -228,17 +251,9 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public void unregisterService(ServiceInstance<T> service) throws Exception
     {
-        services.remove(service.getId());
+        clean();
 
-        String path = pathForInstance(service.getName(), service.getId());
-        try
-        {
-            client.delete().guaranteed().forPath(path);
-        }
-        catch ( KeeperException.NoNodeException ignore )
-        {
-            // ignore
-        }
+        internalUnregisterService(getOrMakeHolder(service, null));
     }
 
     /**
@@ -249,6 +264,8 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public ServiceProviderBuilder<T> serviceProviderBuilder()
     {
+        clean();
+
         return new ServiceProviderBuilderImpl<T>(this)
             .providerStrategy(new RoundRobinStrategy<T>())
             .threadFactory(ThreadUtils.newThreadFactory("ServiceProvider"));
@@ -262,6 +279,8 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public ServiceCacheBuilder<T> serviceCacheBuilder()
     {
+        clean();
+
         return new ServiceCacheBuilderImpl<T>(this)
             .threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
     }
@@ -275,6 +294,8 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public Collection<String> queryForNames() throws Exception
     {
+        clean();
+
         List<String> names = client.getChildren().forPath(basePath);
         return ImmutableList.copyOf(names);
     }
@@ -303,6 +324,8 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @Override
     public ServiceInstance<T> queryForInstance(String name, String id) throws 
Exception
     {
+        clean();
+
         String path = pathForInstance(name, id);
         try
         {
@@ -338,6 +361,8 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
 
     CuratorFramework getClient()
     {
+        clean();
+
         return client;
     }
 
@@ -353,6 +378,8 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
 
     List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher) 
throws Exception
     {
+        clean();
+
         ImmutableList.Builder<ServiceInstance<T>> builder = 
ImmutableList.builder();
         String path = pathForName(name);
         List<String> instanceIds;
@@ -384,6 +411,12 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
         return builder.build();
     }
 
+    @VisibleForTesting
+    int debugServicesQty()
+    {
+        return services.size();
+    }
+
     private List<String> getChildrenWatched(String path, Watcher watcher, 
boolean recurse) throws Exception
     {
         List<String> instanceIds;
@@ -422,23 +455,29 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
     @VisibleForTesting
     ServiceInstance<T> getRegisteredService(String id)
     {
-        return services.get(id);
+        Holder<T> holder = services.get(id);
+        return ((holder != null) && (holder.state.get() == State.REGISTERED)) 
? holder.service.get() : null;
     }
 
     private void reRegisterServices() throws Exception
     {
-        for ( ServiceInstance<T> service : services.values() )
+        for ( Holder<T> service : services.values() )
         {
-            internalRegisterService(service);
+            if ( service.state.get() == State.REGISTERED )
+            {
+                internalRegisterService(service.service.get());
+            }
         }
     }
 
     private void setService(final ServiceInstance<T> instance)
     {
-        services.put(instance.getId(), instance);
-        if ( watchedServices != null )
+        final NodeCache nodeCache = watchInstances ? new NodeCache(client, 
pathForInstance(instance.getName(), instance.getId())) : null;
+        Holder<T> holder = getOrMakeHolder(instance, nodeCache);
+        holder.setState(State.REGISTERED);
+
+        if ( nodeCache != null )
         {
-            final NodeCache nodeCache = new NodeCache(client, 
pathForInstance(instance.getName(), instance.getId()));
             try
             {
                 nodeCache.start(true);
@@ -455,7 +494,11 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
                     if ( nodeCache.getCurrentData() != null )
                     {
                         ServiceInstance<T> newInstance = 
serializer.deserialize(nodeCache.getCurrentData().getData());
-                        services.put(newInstance.getId(), newInstance);
+                        Holder<T> holder = services.get(newInstance.getId());
+                        if ( holder != null )
+                        {
+                            holder.service.set(newInstance);
+                        }
                     }
                     else
                     {
@@ -464,7 +507,62 @@ public class ServiceDiscoveryImpl<T> implements 
ServiceDiscovery<T>
                 }
             };
             nodeCache.getListenable().addListener(listener);
-            watchedServices.put(instance.getId(), nodeCache);
+        }
+    }
+
+    private Holder<T> getOrMakeHolder(ServiceInstance<T> instance, NodeCache 
nodeCache)
+    {
+        Holder<T> newHolder = new Holder<T>(instance);
+        Holder<T> oldHolder = services.putIfAbsent(instance.getId(), 
newHolder);
+        Holder<T> useHolder = (oldHolder != null) ? oldHolder : newHolder;
+        useHolder.cache.set(nodeCache);
+        return useHolder;
+    }
+
+    private void clean()
+    {
+        long localLastCleanMs = lastCleanMs.get();
+        long now = System.currentTimeMillis();
+        long elpased = now - localLastCleanMs;
+        if ( (elpased >= CLEAN_THRESHOLD_MS) && 
lastCleanMs.compareAndSet(localLastCleanMs, now + 1) )
+        {
+            Iterator<Holder<T>> iterator = services.values().iterator();
+            while ( iterator.hasNext() )
+            {
+                Holder<T> holder = iterator.next();
+                if ( holder.state.get() == State.UNREGISTERED )
+                {
+                    long elapsed = System.currentTimeMillis() - 
holder.stateChangeMs.get();
+                    if ( elapsed >= CLEAN_THRESHOLD_MS )
+                    {
+                        iterator.remove();
+                    }
+                }
+            }
+        }
+    }
+
+    private void internalUnregisterService(Holder<T> holder) throws Exception
+    {
+        if ( holder != null )
+        {
+            holder.setState(State.UNREGISTERED);
+            NodeCache cache = holder.cache.getAndSet(null);
+            if ( cache != null )
+            {
+                CloseableUtils.closeQuietly(cache);
+            }
+
+            ServiceInstance<T> service = holder.service.get();
+            String path = pathForInstance(service.getName(), service.getId());
+            try
+            {
+                client.delete().guaranteed().forPath(path);
+            }
+            catch ( KeeperException.NoNodeException ignore )
+            {
+                // ignore
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f489dfeb/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
deleted file mode 100644
index 3b45494..0000000
--- 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.x.discovery;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.KillSession;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
-import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.io.Closeable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-public class TestServiceDiscovery extends BaseClassForTests
-{
-    private static final Comparator<ServiceInstance<Void>> comparator = new 
Comparator<ServiceInstance<Void>>()
-    {
-        @Override
-        public int compare(ServiceInstance<Void> o1, ServiceInstance<Void> o2)
-        {
-            return o1.getId().compareTo(o2.getId());
-        }
-    };
-
-    @Test
-    public void testCrashedServerMultiInstances() throws Exception
-    {
-        List<Closeable> closeables = Lists.newArrayList();
-        try
-        {
-            Timing timing = new Timing();
-            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
-            closeables.add(client);
-            client.start();
-
-            final Semaphore semaphore = new Semaphore(0);
-            ServiceInstance<String> instance1 = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceInstance<String> instance2 = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
-            ServiceDiscovery<String> discovery = new 
ServiceDiscoveryImpl<String>(client, "/test", new 
JsonInstanceSerializer<String>(String.class), instance1, false)
-            {
-                @Override
-                protected void internalRegisterService(ServiceInstance<String> 
service) throws Exception
-                {
-                    super.internalRegisterService(service);
-                    semaphore.release();
-                }
-            };
-            closeables.add(discovery);
-            discovery.start();
-            discovery.registerService(instance2);
-
-            timing.acquireSemaphore(semaphore, 2);
-            Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
-
-            KillSession.kill(client.getZookeeperClient().getZooKeeper(), 
server.getConnectString());
-            server.stop();
-
-            server.restart();
-            closeables.add(server);
-
-            timing.acquireSemaphore(semaphore, 2);
-            Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
-        }
-        finally
-        {
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
-        }
-    }
-
-    @Test
-    public void testCrashedServer() throws Exception
-    {
-        List<Closeable> closeables = Lists.newArrayList();
-        try
-        {
-            Timing timing = new Timing();
-            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
-            closeables.add(client);
-            client.start();
-
-            final Semaphore semaphore = new Semaphore(0);
-            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String> discovery = new 
ServiceDiscoveryImpl<String>(client, "/test", new 
JsonInstanceSerializer<String>(String.class), instance, false)
-            {
-                @Override
-                protected void internalRegisterService(ServiceInstance<String> 
service) throws Exception
-                {
-                    super.internalRegisterService(service);
-                    semaphore.release();
-                }
-            };
-            closeables.add(discovery);
-            discovery.start();
-
-            timing.acquireSemaphore(semaphore);
-            Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
-
-            KillSession.kill(client.getZookeeperClient().getZooKeeper(), 
server.getConnectString());
-            server.stop();
-
-            server.restart();
-            closeables.add(server);
-
-            timing.acquireSemaphore(semaphore);
-            Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
-        }
-        finally
-        {
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
-        }
-    }
-
-    @Test
-    public void testCrashedInstance() throws Exception
-    {
-        List<Closeable> closeables = Lists.newArrayList();
-        try
-        {
-            Timing timing = new Timing();
-
-            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
-            closeables.add(client);
-            client.start();
-
-            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String> discovery = new 
ServiceDiscoveryImpl<String>(client, "/test", new 
JsonInstanceSerializer<String>(String.class), instance, false);
-            closeables.add(discovery);
-            discovery.start();
-
-            Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
-
-            KillSession.kill(client.getZookeeperClient().getZooKeeper(), 
server.getConnectString());
-            Thread.sleep(timing.multiple(1.5).session());
-
-            Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
-        }
-        finally
-        {
-            Collections.reverse(closeables);
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
-        }
-    }
-
-    @Test
-    public void testMultipleInstances() throws Exception
-    {
-        final String SERVICE_ONE = "one";
-        final String SERVICE_TWO = "two";
-
-        List<Closeable> closeables = Lists.newArrayList();
-        try
-        {
-            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
-            closeables.add(client);
-            client.start();
-
-            ServiceInstance<Void> s1_i1 = 
ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
-            ServiceInstance<Void> s1_i2 = 
ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
-            ServiceInstance<Void> s2_i1 = 
ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
-            ServiceInstance<Void> s2_i2 = 
ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
-
-            ServiceDiscovery<Void> discovery = 
ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
-            closeables.add(discovery);
-            discovery.start();
-
-            discovery.registerService(s1_i1);
-            discovery.registerService(s1_i2);
-            discovery.registerService(s2_i1);
-            discovery.registerService(s2_i2);
-
-            Assert.assertEquals(Sets.newHashSet(discovery.queryForNames()), 
Sets.newHashSet(SERVICE_ONE, SERVICE_TWO));
-
-            List<ServiceInstance<Void>> list = Lists.newArrayList();
-            list.add(s1_i1);
-            list.add(s1_i2);
-            Collections.sort(list, comparator);
-            List<ServiceInstance<Void>> queriedInstances = 
Lists.newArrayList(discovery.queryForInstances(SERVICE_ONE));
-            Collections.sort(queriedInstances, comparator);
-            Assert.assertEquals(queriedInstances, list, String.format("Not 
equal l: %s - d: %s", list, queriedInstances));
-
-            list.clear();
-
-            list.add(s2_i1);
-            list.add(s2_i2);
-            Collections.sort(list, comparator);
-            queriedInstances = 
Lists.newArrayList(discovery.queryForInstances(SERVICE_TWO));
-            Collections.sort(queriedInstances, comparator);
-            Assert.assertEquals(queriedInstances, list, String.format("Not 
equal 2: %s - d: %s", list, queriedInstances));
-        }
-        finally
-        {
-            Collections.reverse(closeables);
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
-        }
-    }
-
-    @Test
-    public void testBasic() throws Exception
-    {
-        List<Closeable> closeables = Lists.newArrayList();
-        try
-        {
-            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
-            closeables.add(client);
-            client.start();
-
-            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String> discovery = 
ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
-            closeables.add(discovery);
-            discovery.start();
-
-            Assert.assertEquals(discovery.queryForNames(), 
Arrays.asList("test"));
-
-            List<ServiceInstance<String>> list = Lists.newArrayList();
-            list.add(instance);
-            Assert.assertEquals(discovery.queryForInstances("test"), list);
-        }
-        finally
-        {
-            Collections.reverse(closeables);
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
-        }
-    }
-
-    @Test
-    public void testNoServerOnStart() throws Exception
-    {
-        server.stop();
-        List<Closeable> closeables = Lists.newArrayList();
-        try
-        {
-            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
-            closeables.add(client);
-            client.start();
-
-            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String> discovery = 
ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
-            closeables.add(discovery);
-            discovery.start();
-
-            server.restart();
-            Assert.assertEquals(discovery.queryForNames(), 
Arrays.asList("test"));
-
-            List<ServiceInstance<String>> list = Lists.newArrayList();
-            list.add(instance);
-            Assert.assertEquals(discovery.queryForInstances("test"), list);
-        }
-        finally
-        {
-            Collections.reverse(closeables);
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
-        }
-    }
-
-    // CURATOR-164
-    @Test
-    public void testUnregisterService() throws Exception
-    {
-        final String name = "name";
-
-        final CountDownLatch restartLatch = new CountDownLatch(1);
-        List<Closeable> closeables = Lists.newArrayList();
-
-        InstanceSerializer<String> slowSerializer = new 
JsonInstanceSerializer<String>(String.class)
-        {
-            private boolean first = true;
-
-            @Override
-            public byte[] serialize(ServiceInstance<String> instance) throws 
Exception
-            {
-                if ( first )
-                {
-                    System.out.println("Serializer first registration.");
-                    first = false;
-                }
-                else
-                {
-                    System.out.println("Waiting for reconnect to finish.");
-                    // Simulate the serialize method being slow.
-                    // This could just be a timed wait, but that's kind of 
non-deterministic.
-                    restartLatch.await();
-                }
-                return super.serialize(instance);
-            }
-        };
-
-        try
-        {
-            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
-            closeables.add(client);
-            client.start();
-
-            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build();
-            ServiceDiscovery<String> discovery = 
ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).watchInstances(true).build();
-            closeables.add(discovery);
-            discovery.start();
-
-            Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), 
"Service should start registered.");
-
-            server.stop();
-            server.restart();
-
-            discovery.unregisterService(instance);
-            restartLatch.countDown();
-
-            new Timing().sleepABit(); // Wait for the rest of registration to 
finish.
-
-            Assert.assertTrue(discovery.queryForInstances(name).isEmpty(), 
"Service should have unregistered.");
-        }
-        finally
-        {
-            Collections.reverse(closeables);
-            for ( Closeable c : closeables )
-            {
-                CloseableUtils.closeQuietly(c);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/f489dfeb/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
----------------------------------------------------------------------
diff --git 
a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
new file mode 100644
index 0000000..2808c5c
--- /dev/null
+++ 
b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.discovery.details;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
+import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
+import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+public class TestServiceDiscovery extends BaseClassForTests
+{
+    private static final Comparator<ServiceInstance<Void>> comparator = new 
Comparator<ServiceInstance<Void>>()
+    {
+        @Override
+        public int compare(ServiceInstance<Void> o1, ServiceInstance<Void> o2)
+        {
+            return o1.getId().compareTo(o2.getId());
+        }
+    };
+
+    @Test
+    public void testCrashedServerMultiInstances() throws Exception
+    {
+        List<Closeable> closeables = Lists.newArrayList();
+        try
+        {
+            Timing timing = new Timing();
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            final Semaphore semaphore = new Semaphore(0);
+            ServiceInstance<String> instance1 = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceInstance<String> instance2 = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
+            ServiceDiscovery<String> discovery = new 
ServiceDiscoveryImpl<String>(client, "/test", new 
JsonInstanceSerializer<String>(String.class), instance1, false)
+            {
+                @Override
+                protected void internalRegisterService(ServiceInstance<String> 
service) throws Exception
+                {
+                    super.internalRegisterService(service);
+                    semaphore.release();
+                }
+            };
+            closeables.add(discovery);
+            discovery.start();
+            discovery.registerService(instance2);
+
+            timing.acquireSemaphore(semaphore, 2);
+            Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
+
+            KillSession.kill(client.getZookeeperClient().getZooKeeper(), 
server.getConnectString());
+            server.stop();
+
+            server.restart();
+            closeables.add(server);
+
+            timing.acquireSemaphore(semaphore, 2);
+            Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
+        }
+        finally
+        {
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
+
+    @Test
+    public void testCrashedServer() throws Exception
+    {
+        List<Closeable> closeables = Lists.newArrayList();
+        try
+        {
+            Timing timing = new Timing();
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            final Semaphore semaphore = new Semaphore(0);
+            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceDiscovery<String> discovery = new 
ServiceDiscoveryImpl<String>(client, "/test", new 
JsonInstanceSerializer<String>(String.class), instance, false)
+            {
+                @Override
+                protected void internalRegisterService(ServiceInstance<String> 
service) throws Exception
+                {
+                    super.internalRegisterService(service);
+                    semaphore.release();
+                }
+            };
+            closeables.add(discovery);
+            discovery.start();
+
+            timing.acquireSemaphore(semaphore);
+            Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+
+            KillSession.kill(client.getZookeeperClient().getZooKeeper(), 
server.getConnectString());
+            server.stop();
+
+            server.restart();
+            closeables.add(server);
+
+            timing.acquireSemaphore(semaphore);
+            Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+        }
+        finally
+        {
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
+
+    @Test
+    public void testCrashedInstance() throws Exception
+    {
+        List<Closeable> closeables = Lists.newArrayList();
+        try
+        {
+            Timing timing = new Timing();
+
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceDiscovery<String> discovery = new 
ServiceDiscoveryImpl<String>(client, "/test", new 
JsonInstanceSerializer<String>(String.class), instance, false);
+            closeables.add(discovery);
+            discovery.start();
+
+            Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+
+            KillSession.kill(client.getZookeeperClient().getZooKeeper(), 
server.getConnectString());
+            Thread.sleep(timing.multiple(1.5).session());
+
+            Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
+        }
+        finally
+        {
+            Collections.reverse(closeables);
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
+
+    @Test
+    public void testMultipleInstances() throws Exception
+    {
+        final String SERVICE_ONE = "one";
+        final String SERVICE_TWO = "two";
+
+        List<Closeable> closeables = Lists.newArrayList();
+        try
+        {
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            ServiceInstance<Void> s1_i1 = 
ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
+            ServiceInstance<Void> s1_i2 = 
ServiceInstance.<Void>builder().name(SERVICE_ONE).build();
+            ServiceInstance<Void> s2_i1 = 
ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
+            ServiceInstance<Void> s2_i2 = 
ServiceInstance.<Void>builder().name(SERVICE_TWO).build();
+
+            ServiceDiscovery<Void> discovery = 
ServiceDiscoveryBuilder.builder(Void.class).client(client).basePath("/test").build();
+            closeables.add(discovery);
+            discovery.start();
+
+            discovery.registerService(s1_i1);
+            discovery.registerService(s1_i2);
+            discovery.registerService(s2_i1);
+            discovery.registerService(s2_i2);
+
+            Assert.assertEquals(Sets.newHashSet(discovery.queryForNames()), 
Sets.newHashSet(SERVICE_ONE, SERVICE_TWO));
+
+            List<ServiceInstance<Void>> list = Lists.newArrayList();
+            list.add(s1_i1);
+            list.add(s1_i2);
+            Collections.sort(list, comparator);
+            List<ServiceInstance<Void>> queriedInstances = 
Lists.newArrayList(discovery.queryForInstances(SERVICE_ONE));
+            Collections.sort(queriedInstances, comparator);
+            Assert.assertEquals(queriedInstances, list, String.format("Not 
equal l: %s - d: %s", list, queriedInstances));
+
+            list.clear();
+
+            list.add(s2_i1);
+            list.add(s2_i2);
+            Collections.sort(list, comparator);
+            queriedInstances = 
Lists.newArrayList(discovery.queryForInstances(SERVICE_TWO));
+            Collections.sort(queriedInstances, comparator);
+            Assert.assertEquals(queriedInstances, list, String.format("Not 
equal 2: %s - d: %s", list, queriedInstances));
+        }
+        finally
+        {
+            Collections.reverse(closeables);
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        List<Closeable> closeables = Lists.newArrayList();
+        try
+        {
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceDiscovery<String> discovery = 
ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+            closeables.add(discovery);
+            discovery.start();
+
+            Assert.assertEquals(discovery.queryForNames(), 
Collections.singletonList("test"));
+
+            List<ServiceInstance<String>> list = Lists.newArrayList();
+            list.add(instance);
+            Assert.assertEquals(discovery.queryForInstances("test"), list);
+        }
+        finally
+        {
+            Collections.reverse(closeables);
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
+
+    @Test
+    public void testNoServerOnStart() throws Exception
+    {
+        server.stop();
+        List<Closeable> closeables = Lists.newArrayList();
+        try
+        {
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceDiscovery<String> discovery = 
ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+            closeables.add(discovery);
+            discovery.start();
+
+            server.restart();
+            Assert.assertEquals(discovery.queryForNames(), 
Collections.singletonList("test"));
+
+            List<ServiceInstance<String>> list = Lists.newArrayList();
+            list.add(instance);
+            Assert.assertEquals(discovery.queryForInstances("test"), list);
+        }
+        finally
+        {
+            Collections.reverse(closeables);
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
+
+    // CURATOR-164
+    @Test
+    public void testUnregisterService() throws Exception
+    {
+        final String name = "name";
+
+        final CountDownLatch restartLatch = new CountDownLatch(1);
+        List<Closeable> closeables = Lists.newArrayList();
+
+        InstanceSerializer<String> slowSerializer = new 
JsonInstanceSerializer<String>(String.class)
+        {
+            private boolean first = true;
+
+            @Override
+            public byte[] serialize(ServiceInstance<String> instance) throws 
Exception
+            {
+                if ( first )
+                {
+                    System.out.println("Serializer first registration.");
+                    first = false;
+                }
+                else
+                {
+                    System.out.println("Waiting for reconnect to finish.");
+                    // Simulate the serialize method being slow.
+                    // This could just be a timed wait, but that's kind of 
non-deterministic.
+                    restartLatch.await();
+                }
+                return super.serialize(instance);
+            }
+        };
+
+        try
+        {
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name(name).port(10064).build();
+            ServiceDiscovery<String> discovery = 
ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).serializer(slowSerializer).watchInstances(true).build();
+            closeables.add(discovery);
+            discovery.start();
+
+            Assert.assertFalse(discovery.queryForInstances(name).isEmpty(), 
"Service should start registered.");
+
+            server.stop();
+            server.restart();
+
+            discovery.unregisterService(instance);
+            restartLatch.countDown();
+
+            new Timing().sleepABit(); // Wait for the rest of registration to 
finish.
+
+            Assert.assertTrue(discovery.queryForInstances(name).isEmpty(), 
"Service should have unregistered.");
+        }
+        finally
+        {
+            Collections.reverse(closeables);
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
+
+    @Test
+    public void testCleaning() throws Exception
+    {
+        System.setProperty("curator-discovery-clean-threshold-ms", "10");
+        List<Closeable> closeables = Lists.newArrayList();
+        try
+        {
+            CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            ServiceInstance<String> instance = 
ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceDiscovery<String> discovery = 
ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+            closeables.add(discovery);
+            discovery.start();
+            discovery.unregisterService(instance);
+
+            Thread.sleep(100);
+
+            discovery.queryForNames();  // causes a clean
+            
Assert.assertEquals(((ServiceDiscoveryImpl)discovery).debugServicesQty(), 0);
+        }
+        finally
+        {
+            System.clearProperty("curator-discovery-clean-threshold-ms");
+            Collections.reverse(closeables);
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
+}

Reply via email to