Repository: ambari Updated Branches: refs/heads/trunk afb8b3b71 -> 41f54cc01
AMBARI-11011. all_hosts list is out of sync with all_ipv4_ips and all_racks when hosts are being added progressively Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/41f54cc0 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/41f54cc0 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/41f54cc0 Branch: refs/heads/trunk Commit: 41f54cc0163430439e45b0f1fe8b3c2b88222c90 Parents: afb8b3b Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Thu May 7 18:27:40 2015 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Thu May 7 18:27:40 2015 -0700 ---------------------------------------------------------------------- .../apache/ambari/server/utils/StageUtils.java | 61 ++++++++++++++------ 1 file changed, 43 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/41f54cc0/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java index 68a45ec..ea6c552 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.utils; +import org.apache.commons.lang.StringUtils; import com.google.common.base.Joiner; import com.google.gson.Gson; import com.google.inject.Inject; @@ -260,23 +261,25 @@ public class StageUtils { portsList.add(currentPingPort == null ? DEFAULT_PING_PORT : currentPingPort); String rackInfo = host.getRackInfo(); - rackList.add(rackInfo == null ? DEFAULT_RACK : rackInfo ); + rackList.add(StringUtils.isEmpty(rackInfo) ? DEFAULT_RACK : rackInfo ); String iPv4 = host.getIPv4(); - ipV4List.add(iPv4 == null ? DEFAULT_IPV4_ADDRESS : iPv4 ); + ipV4List.add(StringUtils.isEmpty(iPv4) ? DEFAULT_IPV4_ADDRESS : iPv4 ); } // add hosts from topology manager Map<String, Collection<String>> pendingHostComponents = topologyManager.getProjectedTopology(); for (String hostname : pendingHostComponents.keySet()) { - if (! hostsSet.contains(hostname)) { + if (!hostsSet.contains(hostname)) { hostsSet.add(hostname); - // this is only set in heartbeat handler and since these hosts haven't yet been provisioned, set the default portsList.add(DEFAULT_PING_PORT); + rackList.add(DEFAULT_RACK); + ipV4List.add(DEFAULT_IPV4_ADDRESS); } } List<String> hostsList = new ArrayList<String>(hostsSet); + Map<String, String> additionalComponentToClusterInfoKeyMap = new HashMap<String, String>(); // Fill hosts for services Map<String, SortedSet<Integer>> hostRolesInfo = new HashMap<String, SortedSet<Integer>>(); @@ -290,8 +293,12 @@ public class StageUtils { String componentName = serviceComponent.getName(); String roleName = componentToClusterInfoKeyMap.get(componentName); + if(null == roleName) { + roleName = additionalComponentToClusterInfoKeyMap.get(componentName); + } if (null == roleName && !serviceComponent.isClientComponent()) { roleName = componentName.toLowerCase() + "_hosts"; + additionalComponentToClusterInfoKeyMap.put(componentName, roleName); } String decomRoleName = decommissionedToClusterInfoKeyMap.get(componentName); @@ -341,23 +348,41 @@ public class StageUtils { for (String hostComponent : hostComponents) { String roleName = componentToClusterInfoKeyMap.get(hostComponent); - SortedSet<Integer> hostsForComponentsHost = hostRolesInfo.get(roleName); - - if (hostsForComponentsHost == null) { - hostsForComponentsHost = new TreeSet<Integer>(); - hostRolesInfo.put(roleName, hostsForComponentsHost); + if (null == roleName) { + roleName = additionalComponentToClusterInfoKeyMap.get(hostComponent); + } + if (null == roleName) { + // even though all mappings are being added, componentToClusterInfoKeyMap is + // a higher priority lookup + for (Service service : cluster.getServices().values()) { + for (ServiceComponent sc : service.getServiceComponents().values()) { + if (!sc.isClientComponent() && sc.getName().equals(hostComponent)) { + roleName = hostComponent.toLowerCase() + "_hosts"; + additionalComponentToClusterInfoKeyMap.put(hostComponent, roleName); + } + } + } } - int hostIndex = hostsList.indexOf(hostname); - if (hostIndex != -1) { - if (! hostsForComponentsHost.contains(hostIndex)) { - hostsForComponentsHost.add(hostIndex); + if (roleName != null) { + SortedSet<Integer> hostsForComponentsHost = hostRolesInfo.get(roleName); + + if (hostsForComponentsHost == null) { + hostsForComponentsHost = new TreeSet<Integer>(); + hostRolesInfo.put(roleName, hostsForComponentsHost); + } + + int hostIndex = hostsList.indexOf(hostname); + if (hostIndex != -1) { + if (!hostsForComponentsHost.contains(hostIndex)) { + hostsForComponentsHost.add(hostIndex); + } + } else { + //todo: I don't think that this can happen + //todo: determine if it can and if so, handle properly + //todo: if it 'cant' should probably enforce invariant + throw new RuntimeException("Unable to get host index for host: " + hostname); } - } else { - //todo: I don't think that this can happen - //todo: determine if it can and if so, handle properly - //todo: if it 'cant' should probably enforce invariant - throw new RuntimeException("Unable to get host index for host: " + hostname); } } }