adjust synch model for controller should not synch on class as that is used for things like getExecutionContext (and so can deadlock)
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/ceee1c0d Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/ceee1c0d Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/ceee1c0d Branch: refs/heads/master Commit: ceee1c0dbb4f9d38e253e88abd78f0952b6f42c9 Parents: e3db4f2 Author: Alex Heneveld <[email protected]> Authored: Thu Jun 18 08:55:16 2015 -0700 Committer: Alex Heneveld <[email protected]> Committed: Wed Jun 24 00:40:32 2015 -0700 ---------------------------------------------------------------------- .../entity/proxy/AbstractControllerImpl.java | 141 ++++++++++--------- 1 file changed, 76 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ceee1c0d/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java ---------------------------------------------------------------------- diff --git a/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java b/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java index 297364f..a79b98b 100644 --- a/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java +++ b/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java @@ -68,7 +68,7 @@ import com.google.common.net.HostAndPort; public abstract class AbstractControllerImpl extends SoftwareProcessImpl implements AbstractController { // TODO Should review synchronization model. Currently, all changes to the serverPoolTargets - // (and checking for potential changes) is done while synchronized on this. That means it + // (and checking for potential changes) is done while synchronized on serverPoolAddresses. That means it // will also call update/reload while holding the lock. This is "conservative", but means // sub-classes need to be extremely careful about any additional synchronization and of // their implementations of update/reconfigureService/reload. @@ -79,7 +79,8 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme protected volatile boolean updateNeeded = true; protected AbstractMembershipTrackingPolicy serverPoolMemberTrackerPolicy; - protected Set<String> serverPoolAddresses = Sets.newLinkedHashSet(); + // final because this is the synch target + final protected Set<String> serverPoolAddresses = Sets.newLinkedHashSet(); protected Map<Entity,String> serverPoolTargets = Maps.newLinkedHashMap(); public AbstractControllerImpl() { @@ -356,17 +357,19 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme */ protected abstract void reconfigureService(); - public synchronized void updateNeeded() { - if (updateNeeded) return; - updateNeeded = true; - LOG.debug("queueing an update-needed task for "+this+"; update will occur shortly"); - Entities.submit(this, Tasks.builder().name("update-needed").body(new Runnable() { - @Override - public void run() { - if (updateNeeded) - AbstractControllerImpl.this.update(); - } - }).build()); + public void updateNeeded() { + synchronized (serverPoolAddresses) { + if (updateNeeded) return; + updateNeeded = true; + LOG.debug("queueing an update-needed task for "+this+"; update will occur shortly"); + Entities.submit(this, Tasks.builder().name("update-needed").body(new Runnable() { + @Override + public void run() { + if (updateNeeded) + AbstractControllerImpl.this.update(); + } + }).build()); + } } @Override @@ -381,30 +384,34 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme } } - public synchronized Task<?> updateAsync() { - Task<?> result = null; - if (!isActive()) updateNeeded = true; - else { - updateNeeded = false; - LOG.debug("Updating {} in response to changes", this); - LOG.info("Updating {}, server pool targets {}", new Object[] {this, getAttribute(SERVER_POOL_TARGETS)}); - reconfigureService(); - LOG.debug("Reloading {} in response to changes", this); - // reload should happen synchronously - result = invoke(RELOAD); + public Task<?> updateAsync() { + synchronized (serverPoolAddresses) { + Task<?> result = null; + if (!isActive()) updateNeeded = true; + else { + updateNeeded = false; + LOG.debug("Updating {} in response to changes", this); + LOG.info("Updating {}, server pool targets {}", new Object[] {this, getAttribute(SERVER_POOL_TARGETS)}); + reconfigureService(); + LOG.debug("Reloading {} in response to changes", this); + // reload should happen synchronously + result = invoke(RELOAD); + } + return result; } - return result; } - protected synchronized void onServerPoolMemberChanged(Entity member) { - if (LOG.isTraceEnabled()) LOG.trace("For {}, considering membership of {} which is in locations {}", + protected void onServerPoolMemberChanged(Entity member) { + synchronized (serverPoolAddresses) { + if (LOG.isTraceEnabled()) LOG.trace("For {}, considering membership of {} which is in locations {}", new Object[] {this, member, member.getLocations()}); - if (belongsInServerPool(member)) { - addServerPoolMember(member); - } else { - removeServerPoolMember(member); + if (belongsInServerPool(member)) { + addServerPoolMember(member); + } else { + removeServerPoolMember(member); + } + if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", this, member); } - if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", this, member); } protected boolean belongsInServerPool(Entity member) { @@ -420,45 +427,49 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme return true; } - protected synchronized void addServerPoolMember(Entity member) { - String oldAddress = getAttribute(SERVER_POOL_TARGETS).get(member); - String newAddress = getAddressOfEntity(member); - if (Objects.equal(newAddress, oldAddress)) { - if (LOG.isTraceEnabled()) - if (LOG.isTraceEnabled()) LOG.trace("Ignoring unchanged address {}", oldAddress); - return; - } else if (newAddress == null) { - LOG.info("Removing from {}, member {} with old address {}, because inferred address is now null", new Object[] {this, member, oldAddress}); - } else { - if (oldAddress != null) { - LOG.info("Replacing in {}, member {} with old address {}, new address {}", new Object[] {this, member, oldAddress, newAddress}); + protected void addServerPoolMember(Entity member) { + synchronized (serverPoolAddresses) { + String oldAddress = getAttribute(SERVER_POOL_TARGETS).get(member); + String newAddress = getAddressOfEntity(member); + if (Objects.equal(newAddress, oldAddress)) { + if (LOG.isTraceEnabled()) + if (LOG.isTraceEnabled()) LOG.trace("Ignoring unchanged address {}", oldAddress); + return; + } else if (newAddress == null) { + LOG.info("Removing from {}, member {} with old address {}, because inferred address is now null", new Object[] {this, member, oldAddress}); } else { - LOG.info("Adding to {}, new member {} with address {}", new Object[] {this, member, newAddress}); + if (oldAddress != null) { + LOG.info("Replacing in {}, member {} with old address {}, new address {}", new Object[] {this, member, oldAddress, newAddress}); + } else { + LOG.info("Adding to {}, new member {} with address {}", new Object[] {this, member, newAddress}); + } } + + if (Objects.equal(oldAddress, newAddress)) { + if (LOG.isTraceEnabled()) LOG.trace("For {}, ignoring change in member {} because address still {}", new Object[] {this, member, newAddress}); + return; + } + + // TODO this does it synchronously; an async method leaning on `updateNeeded` and `update` might + // be more appropriate, especially when this is used in a listener + MapAttribute.put(this, SERVER_POOL_TARGETS, member, newAddress); + updateAsync(); } - - if (Objects.equal(oldAddress, newAddress)) { - if (LOG.isTraceEnabled()) LOG.trace("For {}, ignoring change in member {} because address still {}", new Object[] {this, member, newAddress}); - return; - } - - // TODO this does it synchronously; an async method leaning on `updateNeeded` and `update` might - // be more appropriate, especially when this is used in a listener - MapAttribute.put(this, SERVER_POOL_TARGETS, member, newAddress); - updateAsync(); } - protected synchronized void removeServerPoolMember(Entity member) { - if (!getAttribute(SERVER_POOL_TARGETS).containsKey(member)) { - if (LOG.isTraceEnabled()) LOG.trace("For {}, not removing as don't have member {}", new Object[] {this, member}); - return; + protected void removeServerPoolMember(Entity member) { + synchronized (serverPoolAddresses) { + if (!getAttribute(SERVER_POOL_TARGETS).containsKey(member)) { + if (LOG.isTraceEnabled()) LOG.trace("For {}, not removing as don't have member {}", new Object[] {this, member}); + return; + } + + String address = MapAttribute.remove(this, SERVER_POOL_TARGETS, member); + + LOG.info("Removing from {}, member {} with address {}", new Object[] {this, member, address}); + + updateAsync(); } - - String address = MapAttribute.remove(this, SERVER_POOL_TARGETS, member); - - LOG.info("Removing from {}, member {} with address {}", new Object[] {this, member, address}); - - updateAsync(); } protected String getAddressOfEntity(Entity member) {
