sync on holder for safety during multi-step operations
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c62b1137 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c62b1137 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c62b1137 Branch: refs/heads/CURATOR-160 Commit: c62b1137fa25104f2e24d65e467d0cfc769bd6e2 Parents: fa0c9da Author: randgalt <randg...@apache.org> Authored: Tue Apr 21 16:47:10 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Tue Apr 21 16:47:10 2015 -0500 ---------------------------------------------------------------------- .../discovery/details/ServiceDiscoveryImpl.java | 86 ++++++++++++-------- 1 file changed, 53 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/c62b1137/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index 80b012e..ba18e42 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicReference; /** * A mechanism to register and query service instances using ZooKeeper */ +@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -207,16 +208,19 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> { clean(); - Holder<T> holder = getOrMakeHolder(service, null); - if ( holder.state.get() == State.UNREGISTERED ) + final Holder<T> holder = getOrMakeHolder(service, null); + synchronized(holder) { - throw new Exception("Service has been unregistered: " + service); - } + if ( holder.state.get() == State.UNREGISTERED ) + { + throw new Exception("Service has been unregistered: " + service); + } - holder.service.set(service); - byte[] bytes = serializer.serialize(service); - String path = pathForInstance(service.getName(), service.getId()); - client.setData().forPath(path, bytes); + holder.service.set(service); + byte[] bytes = serializer.serialize(service); + String path = pathForInstance(service.getName(), service.getId()); + client.setData().forPath(path, bytes); + } } @VisibleForTesting @@ -455,17 +459,27 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> @VisibleForTesting ServiceInstance<T> getRegisteredService(String id) { - Holder<T> holder = services.get(id); - return ((holder != null) && (holder.state.get() == State.REGISTERED)) ? holder.service.get() : null; + final Holder<T> holder = services.get(id); + if ( holder != null ) + { + synchronized(holder) + { + return (holder.state.get() == State.REGISTERED) ? holder.service.get() : null; + } + } + return null; } private void reRegisterServices() throws Exception { - for ( Holder<T> service : services.values() ) + for ( final Holder<T> holder : services.values() ) { - if ( service.state.get() == State.REGISTERED ) + synchronized(holder) { - internalRegisterService(service.service.get()); + if ( holder.state.get() == State.REGISTERED ) + { + internalRegisterService(holder.service.get()); + } } } } @@ -529,39 +543,45 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> Iterator<Holder<T>> iterator = services.values().iterator(); while ( iterator.hasNext() ) { - Holder<T> holder = iterator.next(); - if ( holder.state.get() == State.UNREGISTERED ) + final Holder<T> holder = iterator.next(); + synchronized(holder) { - long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get(); - if ( elapsed >= CLEAN_THRESHOLD_MS ) + if ( holder.state.get() == State.UNREGISTERED ) { - iterator.remove(); + long elapsed = System.currentTimeMillis() - holder.stateChangeMs.get(); + if ( elapsed >= CLEAN_THRESHOLD_MS ) + { + iterator.remove(); + } } } } } } - private void internalUnregisterService(Holder<T> holder) throws Exception + private void internalUnregisterService(final Holder<T> holder) throws Exception { if ( holder != null ) { - holder.setState(State.UNREGISTERED); - NodeCache cache = holder.cache.getAndSet(null); - if ( cache != null ) + synchronized(holder) { - CloseableUtils.closeQuietly(cache); - } + holder.setState(State.UNREGISTERED); + NodeCache cache = holder.cache.getAndSet(null); + if ( cache != null ) + { + CloseableUtils.closeQuietly(cache); + } - ServiceInstance<T> service = holder.service.get(); - String path = pathForInstance(service.getName(), service.getId()); - try - { - client.delete().guaranteed().forPath(path); - } - catch ( KeeperException.NoNodeException ignore ) - { - // ignore + ServiceInstance<T> service = holder.service.get(); + String path = pathForInstance(service.getName(), service.getId()); + try + { + client.delete().guaranteed().forPath(path); + } + catch ( KeeperException.NoNodeException ignore ) + { + // ignore + } } } }