Repository: ambari Updated Branches: refs/heads/branch-2.6 e6fa2279b -> 55d0db4af
AMBARI-21898. Property provider in-memory maps are refreshed too slowly after config updates. (swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/55d0db4a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/55d0db4a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/55d0db4a Branch: refs/heads/branch-2.6 Commit: 55d0db4afdccc980ac58c194e23344d7a8266a03 Parents: e6fa227 Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Thu Sep 7 11:25:47 2017 -0700 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Thu Sep 7 11:25:58 2017 -0700 ---------------------------------------------------------------------- .../internal/AbstractProviderModule.java | 114 ++++++++----------- .../org/apache/ambari/server/state/Cluster.java | 5 + .../server/state/cluster/ClusterImpl.java | 14 ++- 3 files changed, 63 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/55d0db4a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java index 5fc4c31..4636a3b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java @@ -51,7 +51,6 @@ import org.apache.ambari.server.controller.spi.NoSuchResourceException; import org.apache.ambari.server.controller.spi.Predicate; import org.apache.ambari.server.controller.spi.PropertyProvider; import org.apache.ambari.server.controller.spi.ProviderModule; -import org.apache.ambari.server.controller.spi.Request; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.ResourceProvider; import org.apache.ambari.server.controller.spi.SystemException; @@ -67,6 +66,9 @@ import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.DesiredConfig; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,10 +86,6 @@ public abstract class AbstractProviderModule implements ProviderModule, private static final int PROPERTY_REQUEST_CONNECT_TIMEOUT = 5000; private static final int PROPERTY_REQUEST_READ_TIMEOUT = 10000; - private static final String CLUSTER_NAME_PROPERTY_ID = PropertyHelper.getPropertyId("Clusters", "cluster_name"); - private static final String HOST_COMPONENT_CLUSTER_NAME_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "cluster_name"); - private static final String HOST_COMPONENT_HOST_NAME_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "host_name"); - private static final String HOST_COMPONENT_COMPONENT_NAME_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "component_name"); private static final String GANGLIA_SERVER = "GANGLIA_SERVER"; private static final String METRIC_SERVER = "METRICS_COLLECTOR"; private static final String PROPERTIES_CATEGORY = "properties"; @@ -219,6 +217,11 @@ public abstract class AbstractProviderModule implements ProviderModule, */ private final Map<Resource.Type, List<PropertyProvider>> propertyProviders = new HashMap<Resource.Type, List<PropertyProvider>>(); + /* + * TODO: Instantiation for the concrete impl of this class is not done through + * dependency injector (guice) so none of these field initialization + * are going to work unless refactoring is complete. + */ @Inject AmbariManagementController managementController; @@ -243,6 +246,8 @@ public abstract class AbstractProviderModule implements ProviderModule, @Inject protected AmbariEventPublisher eventPublisher; + @Inject + private Clusters clusters; /** * The map of host components. @@ -257,8 +262,7 @@ public abstract class AbstractProviderModule implements ProviderModule, /** * JMX ports read from the configs */ - private final Map<String, ConcurrentMap<String, ConcurrentMap<String, String>> >jmxPortMap = - Collections.synchronizedMap(new HashMap<String, ConcurrentMap<String, ConcurrentMap<String, String>>>()); + private final Map<String, ConcurrentMap<String, ConcurrentMap<String, String>>> jmxPortMap = new ConcurrentHashMap<>(1); private volatile boolean initialized = false; @@ -292,6 +296,10 @@ public abstract class AbstractProviderModule implements ProviderModule, if (null == metricsCollectorHAManager && null != managementController) { metricsCollectorHAManager = managementController.getMetricsCollectorHAManager(); } + + if (null == clusters && null != managementController) { + clusters = managementController.getClusters(); + } } @@ -515,17 +523,19 @@ public abstract class AbstractProviderModule implements ProviderModule, @Override public String getPort(String clusterName, String componentName, String hostName, boolean httpsEnabled) throws SystemException { - // Parent map need not be synchronized - ConcurrentMap<String, ConcurrentMap<String, String>> clusterJmxPorts = jmxPortMap.get(clusterName); - if (clusterJmxPorts == null) { + ConcurrentMap<String, ConcurrentMap<String, String>> clusterJmxPorts; + // Still need double check to ensure single init + if (!jmxPortMap.containsKey(clusterName)) { synchronized (jmxPortMap) { - clusterJmxPorts = jmxPortMap.get(clusterName); - if (clusterJmxPorts == null) { - clusterJmxPorts = new ConcurrentHashMap<String, ConcurrentMap<String, String>>(); + if (!jmxPortMap.containsKey(clusterName)) { + clusterJmxPorts = new ConcurrentHashMap<>(); jmxPortMap.put(clusterName, clusterJmxPorts); } } } + + clusterJmxPorts = jmxPortMap.get(clusterName); + Service.Type service = componentServiceMap.get(componentName); if (service != null) { @@ -851,55 +861,40 @@ public abstract class AbstractProviderModule implements ProviderModule, if (initialized) { synchronized (this) { if (initialized) { + LOG.info("Resetting property provider maps to reflect changes in " + + "cluster state"); initialized = false; } } } } + // TODO: Fix for multi-service feature support (trunk) + // Called from a synchornized block ! private void initProviderMaps() throws SystemException { - ResourceProvider provider = getResourceProvider(Resource.Type.Cluster); - - Set<String> propertyIds = new HashSet<String>(); - propertyIds.add(ClusterResourceProvider.CLUSTER_NAME_PROPERTY_ID); - - Map<String, String> requestInfoProperties = new HashMap<String, String>(); - requestInfoProperties.put(ClusterResourceProvider.GET_IGNORE_PERMISSIONS_PROPERTY_ID, "true"); - - Request request = PropertyHelper.getReadRequest(propertyIds, - requestInfoProperties, null, null, null); - - try { - jmxPortMap.clear(); - Set<Resource> clusters = provider.getResources(request, null); - - clusterHostComponentMap = new HashMap<String, Map<String, String>>(); - clusterGangliaCollectorMap = new HashMap<String, String>(); + jmxPortMap.clear(); + clusterHostComponentMap = new HashMap<>(); + clusterGangliaCollectorMap = new HashMap<>(); - for (Resource cluster : clusters) { - - String clusterName = (String) cluster.getPropertyValue(CLUSTER_NAME_PROPERTY_ID); - - // initialize the host component map and Ganglia server from the known hosts components... - provider = getResourceProvider(Resource.Type.HostComponent); - - request = PropertyHelper.getReadRequest(HOST_COMPONENT_HOST_NAME_PROPERTY_ID, - HOST_COMPONENT_COMPONENT_NAME_PROPERTY_ID); - - Predicate predicate = new PredicateBuilder().property(HOST_COMPONENT_CLUSTER_NAME_PROPERTY_ID). - equals(clusterName).toPredicate(); + Map<String, Cluster> clusterMap = clusters.getClusters(); + if (MapUtils.isEmpty(clusterMap)) { + return; + } - Set<Resource> hostComponents = provider.getResources(request, predicate); - Map<String, String> hostComponentMap = clusterHostComponentMap.get(clusterName); + for (Cluster cluster : clusterMap.values()) { + String clusterName = cluster.getClusterName(); + Map<String, String> hostComponentMap = clusterHostComponentMap.get(clusterName); - if (hostComponentMap == null) { - hostComponentMap = new HashMap<String, String>(); - clusterHostComponentMap.put(clusterName, hostComponentMap); - } + if (hostComponentMap == null) { + hostComponentMap = new HashMap<>(); + clusterHostComponentMap.put(clusterName, hostComponentMap); + } - for (Resource hostComponent : hostComponents) { - String componentName = (String) hostComponent.getPropertyValue(HOST_COMPONENT_COMPONENT_NAME_PROPERTY_ID); - String hostName = (String) hostComponent.getPropertyValue(HOST_COMPONENT_HOST_NAME_PROPERTY_ID); + List<ServiceComponentHost> serviceComponentHosts = cluster.getServiceComponentHosts(); + if (!CollectionUtils.isEmpty(serviceComponentHosts)) { + for (ServiceComponentHost sch : serviceComponentHosts) { + String componentName = sch.getServiceComponentName(); + String hostName = sch.getHostName(); hostComponentMap.put(componentName, hostName); @@ -909,26 +904,11 @@ public abstract class AbstractProviderModule implements ProviderModule, } if (componentName.equals(METRIC_SERVER)) { // If current collector host is null or if the host or the host component not live - // Update clusterMetricCollectorMap. + // Update clusterMetricCollectorMap. metricsCollectorHAManager.addCollectorHost(clusterName, hostName); } } } - } catch (UnsupportedPropertyException e) { - if (LOG.isErrorEnabled()) { - LOG.error("Caught UnsupportedPropertyException while trying to get the host mappings.", e); - } - throw new SystemException("An exception occurred while initializing the host mappings: " + e, e); - } catch (NoSuchResourceException e) { - if (LOG.isErrorEnabled()) { - LOG.error("Caught NoSuchResourceException exception while trying to get the host mappings.", e); - } - throw new SystemException("An exception occurred while initializing the host mappings: " + e, e); - } catch (NoSuchParentResourceException e) { - if (LOG.isErrorEnabled()) { - LOG.error("Caught NoSuchParentResourceException exception while trying to get the host mappings.", e); - } - throw new SystemException("An exception occurred while initializing the host mappings: " + e, e); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/55d0db4a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java index ee18fdf..15efcd2 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java @@ -119,6 +119,11 @@ public interface Cluster { List<ServiceComponentHost> getServiceComponentHosts(String serviceName, String componentName); /** + * Get all ServiceComponentHosts for this cluster. + */ + List<ServiceComponentHost> getServiceComponentHosts(); + + /** * Get all hosts associated with this cluster. * * @return collection of hosts that are associated with this cluster http://git-wip-us.apache.org/repos/asf/ambari/blob/55d0db4a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index 2f858b8..68e557a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import javax.annotation.Nullable; @@ -558,8 +557,17 @@ public class ClusterImpl implements Cluster { throw new ServiceComponentHostNotFoundException(getClusterName(), serviceName, serviceComponentName, hostname); } - return serviceComponentHosts.get(serviceName).get(serviceComponentName).get( - hostname); + return serviceComponentHosts.get(serviceName).get(serviceComponentName).get(hostname); + } + + public List<ServiceComponentHost> getServiceComponentHosts() { + List<ServiceComponentHost> serviceComponentHosts = new ArrayList<>(); + if (!serviceComponentHostsByHost.isEmpty()) { + for (List<ServiceComponentHost> schList : serviceComponentHostsByHost.values()) { + serviceComponentHosts.addAll(schList); + } + } + return Collections.unmodifiableList(serviceComponentHosts); } @Override