Repository: ambari Updated Branches: refs/heads/trunk 8bb25bed2 -> 43511187b
MBARI-12720. Blueprint Logical Request stuck in waiting mode during large cluster deployments. (rnettleton) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/43511187 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/43511187 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/43511187 Branch: refs/heads/trunk Commit: 43511187b140b32ab400ea13dce07300f0864c5b Parents: 8bb25be Author: Bob Nettleton <rnettle...@hortonworks.com> Authored: Thu Aug 13 18:19:02 2015 -0400 Committer: Bob Nettleton <rnettle...@hortonworks.com> Committed: Thu Aug 13 18:19:02 2015 -0400 ---------------------------------------------------------------------- .../ambari/server/topology/TopologyManager.java | 77 +++++++++++--------- 1 file changed, 42 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/43511187/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index 8e7fb7f..5f3bb9d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -150,47 +150,49 @@ public class TopologyManager { boolean matchedToRequest = false; String hostName = host.getHostName(); - synchronized(reservedHosts) { - if (reservedHosts.containsKey(hostName)) { - LogicalRequest request = reservedHosts.remove(hostName); - HostOfferResponse response = request.offer(host); - if (response.getAnswer() != HostOfferResponse.Answer.ACCEPTED) { - throw new RuntimeException("LogicalRequest declined host offer of explicitly requested host: " + hostName); - } + // The lock ordering in this method must always be the same ordering as TopologyManager.processRequest + // TODO: Locking strategies for TopologyManager should be reviewed and possibly rewritten in a future release + synchronized (availableHosts) { + synchronized(reservedHosts) { + if (reservedHosts.containsKey(hostName)) { + LogicalRequest request = reservedHosts.remove(hostName); + HostOfferResponse response = request.offer(host); + if (response.getAnswer() != HostOfferResponse.Answer.ACCEPTED) { + throw new RuntimeException("LogicalRequest declined host offer of explicitly requested host: " + hostName); + } - LOG.info("TopologyManager.onHostRegistered: processing accepted host offer for reserved host = {}", hostName); - processAcceptedHostOffer(getClusterTopology(request.getClusterName()), response, host); - matchedToRequest = true; + LOG.info("TopologyManager.onHostRegistered: processing accepted host offer for reserved host = {}", hostName); + processAcceptedHostOffer(getClusterTopology(request.getClusterName()), response, host); + matchedToRequest = true; + } } - } - // can be true if host was reserved - if (! matchedToRequest) { - synchronized (outstandingRequests) { - Iterator<LogicalRequest> outstandingRequestIterator = outstandingRequests.iterator(); - while (! matchedToRequest && outstandingRequestIterator.hasNext()) { - LogicalRequest request = outstandingRequestIterator.next(); - HostOfferResponse hostOfferResponse = request.offer(host); - switch (hostOfferResponse.getAnswer()) { - case ACCEPTED: - matchedToRequest = true; - LOG.info("TopologyManager.onHostRegistered: processing accepted host offer for matched host = {}", hostName); - processAcceptedHostOffer(getClusterTopology(request.getClusterName()), hostOfferResponse, host); - break; - case DECLINED_DONE: - LOG.info("TopologyManager.onHostRegistered: DECLINED_DONE received for host = {}", hostName); - outstandingRequestIterator.remove(); - break; - case DECLINED_PREDICATE: - LOG.info("TopologyManager.onHostRegistered: DECLINED_PREDICATE received for host = {}", hostName); - break; + // can be true if host was reserved + if (!matchedToRequest) { + synchronized (outstandingRequests) { + Iterator<LogicalRequest> outstandingRequestIterator = outstandingRequests.iterator(); + while (!matchedToRequest && outstandingRequestIterator.hasNext()) { + LogicalRequest request = outstandingRequestIterator.next(); + HostOfferResponse hostOfferResponse = request.offer(host); + switch (hostOfferResponse.getAnswer()) { + case ACCEPTED: + matchedToRequest = true; + LOG.info("TopologyManager.onHostRegistered: processing accepted host offer for matched host = {}", hostName); + processAcceptedHostOffer(getClusterTopology(request.getClusterName()), hostOfferResponse, host); + break; + case DECLINED_DONE: + LOG.info("TopologyManager.onHostRegistered: DECLINED_DONE received for host = {}", hostName); + outstandingRequestIterator.remove(); + break; + case DECLINED_PREDICATE: + LOG.info("TopologyManager.onHostRegistered: DECLINED_PREDICATE received for host = {}", hostName); + break; + } } } } - } - if (!matchedToRequest) { - synchronized (availableHosts) { + if (!matchedToRequest) { boolean addToAvailableList = true; for (HostImpl registered : availableHosts) { if (registered.getHostId() == host.getHostId()) { @@ -316,6 +318,9 @@ public class TopologyManager { boolean requestHostComplete = false; //todo: overall synchronization. Currently we have nested synchronization here + + // The lock ordering in this method must always be the same ordering as TopologyManager.onHostRegistered + // TODO: Locking strategies for TopologyManager should be reviewed and possibly rewritten in a future release synchronized(availableHosts) { Iterator<HostImpl> hostIterator = availableHosts.iterator(); while (! requestHostComplete && hostIterator.hasNext()) { @@ -368,7 +373,9 @@ public class TopologyManager { // not all required hosts have been matched (see earlier comment regarding outstanding logical requests) LOG.info("TopologyManager.processRequest: not all required hosts have been matched, so adding LogicalRequest ID = {} to outstanding requests", logicalRequest.getRequestId()); - outstandingRequests.add(logicalRequest); + synchronized (outstandingRequests) { + outstandingRequests.add(logicalRequest); + } } } return logicalRequest;