http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java index 9917720..fe1f338 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java @@ -17,18 +17,22 @@ */ package org.apache.ambari.server.state.configgroup; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.DuplicateResourceException; import org.apache.ambari.server.controller.ConfigGroupResponse; import org.apache.ambari.server.controller.internal.ConfigurationResourceProvider; +import org.apache.ambari.server.logging.LockFactory; import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.ConfigGroupConfigMappingDAO; import org.apache.ambari.server.orm.dao.ConfigGroupDAO; @@ -44,213 +48,195 @@ import org.apache.ambari.server.orm.entities.HostEntity; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; +import org.apache.ambari.server.state.ConfigFactory; import org.apache.ambari.server.state.Host; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; -import com.google.inject.Inject; -import com.google.inject.Injector; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; import com.google.inject.persist.Transactional; public class ConfigGroupImpl implements ConfigGroup { private static final Logger LOG = LoggerFactory.getLogger(ConfigGroupImpl.class); - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Cluster cluster; - private ConfigGroupEntity configGroupEntity; - private Map<Long, Host> hosts; - private Map<String, Config> configurations; - private volatile boolean isPersisted = false; - - @Inject - private Gson gson; - @Inject - private ConfigGroupDAO configGroupDAO; - @Inject - private ConfigGroupConfigMappingDAO configGroupConfigMappingDAO; - @Inject - private ConfigGroupHostMappingDAO configGroupHostMappingDAO; - @Inject - private HostDAO hostDAO; - @Inject - private ClusterDAO clusterDAO; - @Inject - Clusters clusters; + private ConcurrentMap<Long, Host> m_hosts; + private ConcurrentMap<String, Config> m_configurations; + private String configGroupName; + private long configGroupId; + + /** + * This lock is required to prevent inconsistencies in internal state between + * {@link #m_hosts} and the entities stored by the {@link ConfigGroupEntity}. + */ + private final ReadWriteLock hostLock; + + /** + * A label for {@link #hostLock} to use with the {@link LockFactory}. + */ + private static final String hostLockLabel = "configurationGroupHostLock"; + + private final ConfigGroupDAO configGroupDAO; + + private final ConfigGroupConfigMappingDAO configGroupConfigMappingDAO; + + private final ConfigGroupHostMappingDAO configGroupHostMappingDAO; + + private final HostDAO hostDAO; + + private final ClusterDAO clusterDAO; + + private final ConfigFactory configFactory; @AssistedInject - public ConfigGroupImpl(@Assisted("cluster") Cluster cluster, - @Assisted("name") String name, - @Assisted("tag") String tag, - @Assisted("description") String description, - @Assisted("configs") Map<String, Config> configs, - @Assisted("hosts") Map<Long, Host> hosts, - Injector injector) { - injector.injectMembers(this); + public ConfigGroupImpl(@Assisted("cluster") Cluster cluster, @Assisted("name") String name, + @Assisted("tag") String tag, @Assisted("description") String description, + @Assisted("configs") Map<String, Config> configurations, + @Assisted("hosts") Map<Long, Host> hosts, Clusters clusters, ConfigFactory configFactory, + ClusterDAO clusterDAO, HostDAO hostDAO, ConfigGroupDAO configGroupDAO, + ConfigGroupConfigMappingDAO configGroupConfigMappingDAO, + ConfigGroupHostMappingDAO configGroupHostMappingDAO, LockFactory lockFactory) { + + this.configFactory = configFactory; + this.clusterDAO = clusterDAO; + this.hostDAO = hostDAO; + this.configGroupDAO = configGroupDAO; + this.configGroupConfigMappingDAO = configGroupConfigMappingDAO; + this.configGroupHostMappingDAO = configGroupHostMappingDAO; + + hostLock = lockFactory.newReadWriteLock(hostLockLabel); + this.cluster = cluster; + configGroupName = name; - configGroupEntity = new ConfigGroupEntity(); + ConfigGroupEntity configGroupEntity = new ConfigGroupEntity(); configGroupEntity.setClusterId(cluster.getClusterId()); configGroupEntity.setGroupName(name); configGroupEntity.setTag(tag); configGroupEntity.setDescription(description); - if (hosts != null) { - this.hosts = hosts; - } else { - this.hosts = new HashMap<Long, Host>(); - } + m_hosts = hosts == null ? new ConcurrentHashMap<Long, Host>() + : new ConcurrentHashMap<>(hosts); - if (configs != null) { - configurations = configs; - } else { - configurations = new HashMap<String, Config>(); - } + m_configurations = configurations == null ? new ConcurrentHashMap<String, Config>() + : new ConcurrentHashMap<>(configurations); + + // save the entity and grab the ID + persist(configGroupEntity); + configGroupId = configGroupEntity.getGroupId(); } @AssistedInject - public ConfigGroupImpl(@Assisted Cluster cluster, - @Assisted ConfigGroupEntity configGroupEntity, - Injector injector) { - injector.injectMembers(this); + public ConfigGroupImpl(@Assisted Cluster cluster, @Assisted ConfigGroupEntity configGroupEntity, + Clusters clusters, ConfigFactory configFactory, + ClusterDAO clusterDAO, HostDAO hostDAO, ConfigGroupDAO configGroupDAO, + ConfigGroupConfigMappingDAO configGroupConfigMappingDAO, + ConfigGroupHostMappingDAO configGroupHostMappingDAO, LockFactory lockFactory) { + + this.configFactory = configFactory; + this.clusterDAO = clusterDAO; + this.hostDAO = hostDAO; + this.configGroupDAO = configGroupDAO; + this.configGroupConfigMappingDAO = configGroupConfigMappingDAO; + this.configGroupHostMappingDAO = configGroupHostMappingDAO; + + hostLock = lockFactory.newReadWriteLock(hostLockLabel); + this.cluster = cluster; + configGroupId = configGroupEntity.getGroupId(); + configGroupName = configGroupEntity.getGroupName(); - this.configGroupEntity = configGroupEntity; - configurations = new HashMap<String, Config>(); - hosts = new HashMap<Long, Host>(); + m_configurations = new ConcurrentHashMap<String, Config>(); + m_hosts = new ConcurrentHashMap<Long, Host>(); // Populate configs - for (ConfigGroupConfigMappingEntity configMappingEntity : configGroupEntity - .getConfigGroupConfigMappingEntities()) { - + for (ConfigGroupConfigMappingEntity configMappingEntity : configGroupEntity.getConfigGroupConfigMappingEntities()) { Config config = cluster.getConfig(configMappingEntity.getConfigType(), configMappingEntity.getVersionTag()); if (config != null) { - configurations.put(config.getType(), config); + m_configurations.put(config.getType(), config); } else { - LOG.warn("Unable to find config mapping for config group" - + ", clusterName = " + cluster.getClusterName() - + ", type = " + configMappingEntity.getConfigType() - + ", tag = " + configMappingEntity.getVersionTag()); + LOG.warn("Unable to find config mapping {}/{} for config group in cluster {}", + configMappingEntity.getConfigType(), configMappingEntity.getVersionTag(), + cluster.getClusterName()); } } // Populate Hosts - for (ConfigGroupHostMappingEntity hostMappingEntity : configGroupEntity - .getConfigGroupHostMappingEntities()) { - + for (ConfigGroupHostMappingEntity hostMappingEntity : configGroupEntity.getConfigGroupHostMappingEntities()) { try { Host host = clusters.getHost(hostMappingEntity.getHostname()); HostEntity hostEntity = hostMappingEntity.getHostEntity(); if (host != null && hostEntity != null) { - hosts.put(hostEntity.getHostId(), host); + m_hosts.put(hostEntity.getHostId(), host); } } catch (AmbariException e) { - String msg = "Host seems to be deleted but Config group mapping still " + - "exists !"; - LOG.warn(msg); - LOG.debug(msg, e); + LOG.warn("Host seems to be deleted but Config group mapping still exists !"); + LOG.debug("Host seems to be deleted but Config group mapping still exists !", e); } } - - isPersisted = true; } @Override public Long getId() { - return configGroupEntity.getGroupId(); + return configGroupId; } @Override public String getName() { - readWriteLock.readLock().lock(); - try { - return configGroupEntity.getGroupName(); - } finally { - readWriteLock.readLock().unlock(); - } + return configGroupName; } @Override public void setName(String name) { - readWriteLock.writeLock().lock(); - try { - configGroupEntity.setGroupName(name); - } finally { - readWriteLock.writeLock().unlock(); - } + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + configGroupEntity.setGroupName(name); + configGroupDAO.merge(configGroupEntity); + configGroupName = name; } @Override public String getClusterName() { - return configGroupEntity.getClusterEntity().getClusterName(); + return cluster.getClusterName(); } @Override public String getTag() { - readWriteLock.readLock().lock(); - try { - return configGroupEntity.getTag(); - } finally { - readWriteLock.readLock().unlock(); - } + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + return configGroupEntity.getTag(); } @Override public void setTag(String tag) { - readWriteLock.writeLock().lock(); - try { - configGroupEntity.setTag(tag); - } finally { - readWriteLock.writeLock().unlock(); - } - + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + configGroupEntity.setTag(tag); + configGroupDAO.merge(configGroupEntity); } @Override public String getDescription() { - readWriteLock.readLock().lock(); - try { - return configGroupEntity.getDescription(); - } finally { - readWriteLock.readLock().unlock(); - } + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + return configGroupEntity.getDescription(); } @Override public void setDescription(String description) { - readWriteLock.writeLock().lock(); - try { - configGroupEntity.setDescription(description); - } finally { - readWriteLock.writeLock().unlock(); - } - + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + configGroupEntity.setDescription(description); + configGroupDAO.merge(configGroupEntity); } @Override public Map<Long, Host> getHosts() { - readWriteLock.readLock().lock(); - try { - return Collections.unmodifiableMap(hosts); - } finally { - readWriteLock.readLock().unlock(); - } + return Collections.unmodifiableMap(m_hosts); } @Override public Map<String, Config> getConfigurations() { - readWriteLock.readLock().lock(); - try { - return Collections.unmodifiableMap(configurations); - } finally { - readWriteLock.readLock().unlock(); - } - + return Collections.unmodifiableMap(m_configurations); } /** @@ -259,13 +245,14 @@ public class ConfigGroupImpl implements ConfigGroup { */ @Override public void setHosts(Map<Long, Host> hosts) { - readWriteLock.writeLock().lock(); + hostLock.writeLock().lock(); try { - this.hosts = hosts; + // persist enitites in a transaction first, then update internal state + replaceHostMappings(hosts); + m_hosts = new ConcurrentHashMap<>(hosts); } finally { - readWriteLock.writeLock().unlock(); + hostLock.writeLock().unlock(); } - } /** @@ -273,115 +260,140 @@ public class ConfigGroupImpl implements ConfigGroup { * @param configs */ @Override - public void setConfigurations(Map<String, Config> configs) { - readWriteLock.writeLock().lock(); - try { - configurations = configs; - } finally { - readWriteLock.writeLock().unlock(); - } - + public void setConfigurations(Map<String, Config> configurations) { + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + ClusterEntity clusterEntity = configGroupEntity.getClusterEntity(); + + // only update the internal state after the configurations have been + // persisted + persistConfigMapping(clusterEntity, configGroupEntity, configurations); + m_configurations = new ConcurrentHashMap<>(configurations); } @Override - @Transactional public void removeHost(Long hostId) throws AmbariException { - readWriteLock.writeLock().lock(); + hostLock.writeLock().lock(); try { - if (hosts.containsKey(hostId)) { - String hostName = hosts.get(hostId).getHostName(); - LOG.info("Removing host from config group, hostid = " + hostId + ", hostname = " + hostName); - hosts.remove(hostId); - try { - ConfigGroupHostMappingEntityPK hostMappingEntityPK = new - ConfigGroupHostMappingEntityPK(); - hostMappingEntityPK.setHostId(hostId); - hostMappingEntityPK.setConfigGroupId(configGroupEntity.getGroupId()); - configGroupHostMappingDAO.removeByPK(hostMappingEntityPK); - } catch (Exception e) { - LOG.error("Failed to delete config group host mapping" - + ", clusterName = " + getClusterName() - + ", id = " + getId() - + ", hostid = " + hostId - + ", hostname = " + hostName, e); - throw new AmbariException(e.getMessage()); - } + Host host = m_hosts.get(hostId); + if (null == host) { + return; } - } finally { - readWriteLock.writeLock().unlock(); - } - } - @Override - public void persist() { - readWriteLock.writeLock().lock(); - try { - if (!isPersisted) { - persistEntities(); - refresh(); - cluster.refresh(); - isPersisted = true; - } else { - saveIfPersisted(); + String hostName = host.getHostName(); + LOG.info("Removing host (id={}, name={}) from config group", host.getHostId(), hostName); + + try { + // remove the entities first, then update internal state + removeConfigGroupHostEntity(host); + m_hosts.remove(hostId); + } catch (Exception e) { + LOG.error("Failed to delete config group host mapping for cluster {} and host {}", + cluster.getClusterName(), hostName, e); + + throw new AmbariException(e.getMessage()); } } finally { - readWriteLock.writeLock().unlock(); + hostLock.writeLock().unlock(); } } /** + * Removes the {@link ConfigGroupHostMappingEntity} for the specified host + * from this configuration group. + * + * @param host + * the host to remove. + */ + @Transactional + void removeConfigGroupHostEntity(Host host) { + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + ConfigGroupHostMappingEntityPK hostMappingEntityPK = new ConfigGroupHostMappingEntityPK(); + hostMappingEntityPK.setHostId(host.getHostId()); + hostMappingEntityPK.setConfigGroupId(configGroupId); + + ConfigGroupHostMappingEntity configGroupHostMapping = configGroupHostMappingDAO.findByPK( + hostMappingEntityPK); + + configGroupHostMappingDAO.remove(configGroupHostMapping); + + configGroupEntity.getConfigGroupHostMappingEntities().remove(configGroupHostMapping); + configGroupEntity = configGroupDAO.merge(getConfigGroupEntity()); + } + + /** + * @param configGroupEntity + */ + private void persist(ConfigGroupEntity configGroupEntity) { + persistEntities(configGroupEntity); + cluster.refresh(); + } + + /** * Persist Config group with host mapping and configurations * * @throws Exception */ @Transactional - void persistEntities() { + void persistEntities(ConfigGroupEntity configGroupEntity) { ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId()); configGroupEntity.setClusterEntity(clusterEntity); configGroupEntity.setTimestamp(System.currentTimeMillis()); configGroupDAO.create(configGroupEntity); - persistConfigMapping(clusterEntity); - persistHostMapping(); - } + configGroupId = configGroupEntity.getGroupId(); - // TODO: Test rollback scenario + persistConfigMapping(clusterEntity, configGroupEntity, m_configurations); + replaceHostMappings(m_hosts); + } /** - * Persist host mapping + * Replaces all existing host mappings with the new collection of hosts. * + * @param the + * new hosts * @throws Exception */ - @Override @Transactional - public void persistHostMapping() { - if (isPersisted) { - // Delete existing mappings and create new ones - configGroupHostMappingDAO.removeAllByGroup(configGroupEntity.getGroupId()); - configGroupEntity.setConfigGroupHostMappingEntities(new HashSet<ConfigGroupHostMappingEntity>()); - } + void replaceHostMappings(Map<Long, Host> hosts) { + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + + // Delete existing mappings and create new ones + configGroupHostMappingDAO.removeAllByGroup(configGroupEntity.getGroupId()); + configGroupEntity.setConfigGroupHostMappingEntities( + new HashSet<ConfigGroupHostMappingEntity>()); if (hosts != null && !hosts.isEmpty()) { - for (Host host : hosts.values()) { - HostEntity hostEntity = hostDAO.findById(host.getHostId()); - if (hostEntity != null) { - ConfigGroupHostMappingEntity hostMappingEntity = new - ConfigGroupHostMappingEntity(); - hostMappingEntity.setHostId(hostEntity.getHostId()); - hostMappingEntity.setHostEntity(hostEntity); - hostMappingEntity.setConfigGroupEntity(configGroupEntity); - hostMappingEntity.setConfigGroupId(configGroupEntity.getGroupId()); - configGroupEntity.getConfigGroupHostMappingEntities().add - (hostMappingEntity); - configGroupHostMappingDAO.create(hostMappingEntity); - } else { - LOG.warn("Host seems to be deleted, cannot create host to config " + - "group mapping, host = " + host.getHostName()); - } + configGroupEntity = persistHostMapping(hosts.values(), configGroupEntity); + } + } + + /** + * Adds the collection of hosts to the configuration group. + * + * @param hostEntity + * @param configGroupEntity + */ + @Transactional + ConfigGroupEntity persistHostMapping(Collection<Host> hosts, + ConfigGroupEntity configGroupEntity) { + for (Host host : hosts) { + HostEntity hostEntity = hostDAO.findById(host.getHostId()); + if (hostEntity != null) { + ConfigGroupHostMappingEntity hostMappingEntity = new ConfigGroupHostMappingEntity(); + hostMappingEntity.setHostId(hostEntity.getHostId()); + hostMappingEntity.setHostEntity(hostEntity); + hostMappingEntity.setConfigGroupEntity(configGroupEntity); + hostMappingEntity.setConfigGroupId(configGroupEntity.getGroupId()); + configGroupEntity.getConfigGroupHostMappingEntities().add(hostMappingEntity); + configGroupHostMappingDAO.create(hostMappingEntity); + } else { + LOG.warn( + "The host {} has been removed from the cluster and cannot be added to the configuration group {}", + host.getHostName(), configGroupName); } } - // TODO: Make sure this does not throw Nullpointer based on JPA docs - configGroupEntity = configGroupDAO.merge(configGroupEntity); + + return configGroupDAO.merge(configGroupEntity); } /** @@ -391,42 +403,31 @@ public class ConfigGroupImpl implements ConfigGroup { * @throws Exception */ @Transactional - void persistConfigMapping(ClusterEntity clusterEntity) { - if (isPersisted) { - configGroupConfigMappingDAO.removeAllByGroup(configGroupEntity.getGroupId()); - configGroupEntity.setConfigGroupConfigMappingEntities(new HashSet<ConfigGroupConfigMappingEntity>()); - } + void persistConfigMapping(ClusterEntity clusterEntity, + ConfigGroupEntity configGroupEntity, Map<String, Config> configurations) { + configGroupConfigMappingDAO.removeAllByGroup(configGroupEntity.getGroupId()); + configGroupEntity.setConfigGroupConfigMappingEntities( + new HashSet<ConfigGroupConfigMappingEntity>()); if (configurations != null && !configurations.isEmpty()) { - for (Config config : configurations.values()) { + for (Entry<String, Config> entry : configurations.entrySet()) { + Config config = entry.getValue(); ClusterConfigEntity clusterConfigEntity = clusterDAO.findConfig (cluster.getClusterId(), config.getType(), config.getTag()); if (clusterConfigEntity == null) { - config.setVersion(cluster.getNextConfigVersion(config.getType())); - config.setStackId(cluster.getDesiredStackVersion()); - // Create configuration - clusterConfigEntity = new ClusterConfigEntity(); - clusterConfigEntity.setClusterId(clusterEntity.getClusterId()); - clusterConfigEntity.setClusterEntity(clusterEntity); - clusterConfigEntity.setStack(clusterEntity.getDesiredStack()); - clusterConfigEntity.setType(config.getType()); - clusterConfigEntity.setVersion(config.getVersion()); - clusterConfigEntity.setTag(config.getTag()); - clusterConfigEntity.setData(gson.toJson(config.getProperties())); - if (null != config.getPropertiesAttributes()) { - clusterConfigEntity.setAttributes(gson.toJson(config.getPropertiesAttributes())); - } - clusterConfigEntity.setTimestamp(System.currentTimeMillis()); - clusterDAO.createConfig(clusterConfigEntity); - clusterEntity.getClusterConfigEntities().add(clusterConfigEntity); - cluster.addConfig(config); - clusterDAO.merge(clusterEntity); - cluster.refresh(); + config = configFactory.createNew(cluster, config.getType(), config.getTag(), + config.getProperties(), config.getPropertiesAttributes()); + + entry.setValue(config); + + clusterConfigEntity = clusterDAO.findConfig(cluster.getClusterId(), config.getType(), + config.getTag()); } ConfigGroupConfigMappingEntity configMappingEntity = new ConfigGroupConfigMappingEntity(); + configMappingEntity.setTimestamp(System.currentTimeMillis()); configMappingEntity.setClusterId(clusterEntity.getClusterId()); configMappingEntity.setClusterConfigEntity(clusterConfigEntity); @@ -443,142 +444,84 @@ public class ConfigGroupImpl implements ConfigGroup { } } - void saveIfPersisted() { - if (isPersisted) { - save(clusterDAO.findById(cluster.getClusterId())); - } - } - - @Transactional - void save(ClusterEntity clusterEntity) { - persistHostMapping(); - persistConfigMapping(clusterEntity); - } - @Override + @Transactional public void delete() { - readWriteLock.writeLock().lock(); - try { - configGroupConfigMappingDAO.removeAllByGroup(configGroupEntity.getGroupId()); - configGroupHostMappingDAO.removeAllByGroup(configGroupEntity.getGroupId()); - configGroupDAO.removeByPK(configGroupEntity.getGroupId()); - cluster.refresh(); - isPersisted = false; - } finally { - readWriteLock.writeLock().unlock(); - } + configGroupConfigMappingDAO.removeAllByGroup(configGroupId); + configGroupHostMappingDAO.removeAllByGroup(configGroupId); + configGroupDAO.removeByPK(configGroupId); + cluster.refresh(); } @Override public void addHost(Host host) throws AmbariException { - readWriteLock.writeLock().lock(); + hostLock.writeLock().lock(); try { - if (hosts != null && !hosts.isEmpty()) { - for (Host h : hosts.values()) { - if (h.getHostName().equals(host.getHostName())) { - throw new DuplicateResourceException("Host " + h.getHostName() + - "is already associated with Config Group " + - configGroupEntity.getGroupName()); - } - } - HostEntity hostEntity = hostDAO.findByName(host.getHostName()); - if (hostEntity != null) { - hosts.put(hostEntity.getHostId(), host); - } - } - } finally { - readWriteLock.writeLock().unlock(); - } - } + if (m_hosts.containsKey(host.getHostId())) { + String message = String.format( + "Host %s is already associated with the configuration group %s", host.getHostName(), + configGroupName); - @Override - public void addConfiguration(Config config) throws AmbariException { - readWriteLock.writeLock().lock(); - try { - if (configurations != null && !configurations.isEmpty()) { - for (Config c : configurations.values()) { - if (c.getType().equals(config.getType()) && c.getTag().equals - (config.getTag())) { - throw new DuplicateResourceException("Config " + config.getType() + - " with tag " + config.getTag() + " is already associated " + - "with Config Group " + configGroupEntity.getGroupName()); - } - } - configurations.put(config.getType(), config); + throw new DuplicateResourceException(message); } + + // ensure that we only update the in-memory structure if the merge was + // successful + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + persistHostMapping(Collections.singletonList(host), configGroupEntity); + m_hosts.putIfAbsent(host.getHostId(), host); } finally { - readWriteLock.writeLock().unlock(); + hostLock.writeLock().unlock(); } } @Override public ConfigGroupResponse convertToResponse() throws AmbariException { - readWriteLock.readLock().lock(); - try { - Set<Map<String, Object>> hostnames = new HashSet<Map<String, Object>>(); - for (Host host : hosts.values()) { - Map<String, Object> hostMap = new HashMap<String, Object>(); - hostMap.put("host_name", host.getHostName()); - hostnames.add(hostMap); - } - - Set<Map<String, Object>> configObjMap = new HashSet<Map<String, Object>>(); + Set<Map<String, Object>> hostnames = new HashSet<Map<String, Object>>(); + for (Host host : m_hosts.values()) { + Map<String, Object> hostMap = new HashMap<String, Object>(); + hostMap.put("host_name", host.getHostName()); + hostnames.add(hostMap); + } - for (Config config : configurations.values()) { - Map<String, Object> configMap = new HashMap<String, Object>(); - configMap.put(ConfigurationResourceProvider.CONFIGURATION_CONFIG_TYPE_PROPERTY_ID, - config.getType()); - configMap.put(ConfigurationResourceProvider.CONFIGURATION_CONFIG_TAG_PROPERTY_ID, - config.getTag()); - configObjMap.add(configMap); - } + Set<Map<String, Object>> configObjMap = new HashSet<Map<String, Object>>(); - ConfigGroupResponse configGroupResponse = new ConfigGroupResponse( - configGroupEntity.getGroupId(), cluster.getClusterName(), - configGroupEntity.getGroupName(), configGroupEntity.getTag(), - configGroupEntity.getDescription(), hostnames, configObjMap); - return configGroupResponse; - } finally { - readWriteLock.readLock().unlock(); + for (Config config : m_configurations.values()) { + Map<String, Object> configMap = new HashMap<String, Object>(); + configMap.put(ConfigurationResourceProvider.CONFIGURATION_CONFIG_TYPE_PROPERTY_ID, + config.getType()); + configMap.put(ConfigurationResourceProvider.CONFIGURATION_CONFIG_TAG_PROPERTY_ID, + config.getTag()); + configObjMap.add(configMap); } - } - @Override - @Transactional - public void refresh() { - readWriteLock.writeLock().lock(); - try { - if (isPersisted) { - ConfigGroupEntity groupEntity = configGroupDAO.findById - (configGroupEntity.getGroupId()); - configGroupDAO.refresh(groupEntity); - // TODO What other entities should refresh? - } - } finally { - readWriteLock.writeLock().unlock(); - } + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + ConfigGroupResponse configGroupResponse = new ConfigGroupResponse( + configGroupEntity.getGroupId(), cluster.getClusterName(), + configGroupEntity.getGroupName(), configGroupEntity.getTag(), + configGroupEntity.getDescription(), hostnames, configObjMap); + return configGroupResponse; } - @Override public String getServiceName() { - readWriteLock.readLock().lock(); - try { - return configGroupEntity.getServiceName(); - } finally { - readWriteLock.readLock().unlock(); - } - + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + return configGroupEntity.getServiceName(); } @Override public void setServiceName(String serviceName) { - readWriteLock.writeLock().lock(); - try { - configGroupEntity.setServiceName(serviceName); - } finally { - readWriteLock.writeLock().unlock(); - } + ConfigGroupEntity configGroupEntity = getConfigGroupEntity(); + configGroupEntity.setServiceName(serviceName); + configGroupDAO.merge(configGroupEntity); + } + /** + * Gets the {@link ConfigGroupEntity} by it's ID from the JPA cache. + * + * @return the entity. + */ + private ConfigGroupEntity getConfigGroupEntity() { + return configGroupDAO.findById(configGroupId); } }
http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java index dd5e635..a444f5d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java @@ -954,7 +954,7 @@ public class HostImpl implements Host { } @Transactional - private void persistEntities(HostEntity hostEntity) { + void persistEntities(HostEntity hostEntity) { hostDAO.create(hostEntity); if (!hostEntity.getClusterEntities().isEmpty()) { for (ClusterEntity clusterEntity : hostEntity.getClusterEntities()) { http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/state/services/RetryUpgradeActionService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/RetryUpgradeActionService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/RetryUpgradeActionService.java index 1ea5558..a92aa04 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/services/RetryUpgradeActionService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/RetryUpgradeActionService.java @@ -190,7 +190,7 @@ public class RetryUpgradeActionService extends AbstractScheduledService { * @param requestId Request Id to search tasks for. */ @Transactional - private void retryHoldingCommandsInRequest(Long requestId) { + void retryHoldingCommandsInRequest(Long requestId) { if (requestId == null) { return; } http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/state/stack/UpgradePack.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/UpgradePack.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/UpgradePack.java index edf5c89..76f1897 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/UpgradePack.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/UpgradePack.java @@ -52,6 +52,8 @@ import org.slf4j.LoggerFactory; @XmlAccessorType(XmlAccessType.FIELD) public class UpgradePack { + private static final String ALL_VERSIONS = "*"; + private static Logger LOG = LoggerFactory.getLogger(UpgradePack.class); /** @@ -287,10 +289,15 @@ public class UpgradePack { if (direction.isUpgrade()) { list = groups; } else { - if (type == UpgradeType.ROLLING) { - list = getDowngradeGroupsForRolling(); - } else if (type == UpgradeType.NON_ROLLING) { - list = getDowngradeGroupsForNonrolling(); + switch (type) { + case NON_ROLLING: + list = getDowngradeGroupsForNonrolling(); + break; + case HOST_ORDERED: + case ROLLING: + default: + list = getDowngradeGroupsForRolling(); + break; } } @@ -315,7 +322,6 @@ public class UpgradePack { public boolean canBeApplied(String targetVersion){ // check that upgrade pack can be applied to selected stack // converting 2.2.*.* -> 2\.2(\.\d+)?(\.\d+)?(-\d+)? - String regexPattern = getTarget().replaceAll("\\.", "\\\\."); // . -> \. regexPattern = regexPattern.replaceAll("\\\\\\.\\*", "(\\\\\\.\\\\d+)?"); // \.* -> (\.\d+)? regexPattern = regexPattern.concat("(-\\d+)?"); @@ -357,6 +363,12 @@ public class UpgradePack { private List<Grouping> getDowngradeGroupsForRolling() { List<Grouping> reverse = new ArrayList<Grouping>(); + // !!! Testing exposed groups.size() == 1 issue. Normally there's no precedent for + // a one-group upgrade pack, so take it into account anyway. + if (groups.size() == 1) { + return groups; + } + int idx = 0; int iter = 0; Iterator<Grouping> it = groups.iterator(); @@ -453,6 +465,15 @@ public class UpgradePack { } /** + * @return {@code true} if the upgrade targets any version or stack. Both + * {@link #target} and {@link #targetStack} must equal "*" + */ + public boolean isAllTarget() { + return ALL_VERSIONS.equals(target) && ALL_VERSIONS.equals(targetStack); + } + + + /** * A service definition that holds a list of components in the 'order' element. */ public static class OrderService { http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigurationCondition.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigurationCondition.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigurationCondition.java index 1bd88e4..d229270 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigurationCondition.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigurationCondition.java @@ -52,7 +52,37 @@ public final class ConfigurationCondition extends Condition { * Equals comparison. */ @XmlEnumValue("equals") - EQUALS; + EQUALS, + + /** + * Not equals comparison. + */ + @XmlEnumValue("not-equals") + NOT_EQUALS, + + /** + * String contains. + */ + @XmlEnumValue("contains") + CONTAINS, + + /** + * Does not contain. + */ + @XmlEnumValue("not-contains") + NOT_CONTAINS, + + /** + * Exists with any value. + */ + @XmlEnumValue("exists") + EXISTS, + + /** + * Does not exist. + */ + @XmlEnumValue("not-exists") + NOT_EXISTS; } /** @@ -68,12 +98,18 @@ public final class ConfigurationCondition extends Condition { public String property; /** - * The value to compare against. + * The value to compare against; only valid if comparison type is in (=, !=, contains, !contains). */ @XmlAttribute(name = "value") public String value; /** + * The value to return if comparison type is in (=, !=, contains, !contains) and the config is missing. + */ + @XmlAttribute(name = "return_value_if_config_missing") + public boolean returnValueIfConfigMissing; + + /** * The type of comparison to make. */ @XmlAttribute(name = "comparison") @@ -84,7 +120,7 @@ public final class ConfigurationCondition extends Condition { */ @Override public String toString() { - return Objects.toStringHelper(this).add("type", type).add("property", property).add(value, + return Objects.toStringHelper(this).add("type", type).add("property", property).add("value", value).add("comparison", comparisonType).omitNullValues().toString(); } @@ -94,20 +130,40 @@ public final class ConfigurationCondition extends Condition { @Override public boolean isSatisfied(UpgradeContext upgradeContext) { Cluster cluster = upgradeContext.getCluster(); + + boolean propertyExists = false; Config config = cluster.getDesiredConfigByType(type); - if (null == config) { - return false; + Map<String, String> properties = null; + if (null != config) { + properties = config.getProperties(); + if (properties.containsKey(property)) { + propertyExists = true; + } + } + + if (comparisonType == ComparisonType.EXISTS) { + return propertyExists; + } + if (comparisonType == ComparisonType.NOT_EXISTS) { + return !propertyExists; } - Map<String, String> properties = config.getProperties(); - if (MapUtils.isEmpty(properties)) { - return false; + // If property doesn't exist, we cannot make any claims using =, !=, contains !contains. + // Therefore, check if the Upgrade Pack provided a default return value when the config is missing. + if (!propertyExists) { + return returnValueIfConfigMissing; } String propertyValue = properties.get(property); switch (comparisonType) { case EQUALS: return StringUtils.equals(propertyValue, value); + case NOT_EQUALS: + return !StringUtils.equals(propertyValue, value); + case CONTAINS: + return StringUtils.contains(propertyValue, value); + case NOT_CONTAINS: + return !StringUtils.contains(propertyValue, value); default: return false; } http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java index c1655aa..5225598 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java @@ -1313,7 +1313,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { } @Transactional - private void persistEntities(HostEntity hostEntity, HostComponentStateEntity stateEntity, + void persistEntities(HostEntity hostEntity, HostComponentStateEntity stateEntity, HostComponentDesiredStateEntity desiredStateEntity) { ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity = serviceComponentDesiredStateDAO.findByName( serviceComponent.getClusterId(), serviceComponent.getServiceName(), http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java index e1f5cd2..5e887d4 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java @@ -67,11 +67,10 @@ import org.apache.ambari.server.security.authorization.AuthorizationException; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; -import org.apache.ambari.server.state.ConfigImpl; +import org.apache.ambari.server.state.ConfigFactory; import org.apache.ambari.server.state.DesiredConfig; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.SecurityType; -import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.configgroup.ConfigGroup; import org.apache.ambari.server.utils.RetryHelper; import org.slf4j.Logger; @@ -91,8 +90,13 @@ public class AmbariContext { @Inject private PersistedState persistedState; + /** + * Used for creating read-only instances of existing {@link Config} in order + * to send them to the {@link ConfigGroupResourceProvider} to create + * {@link ConfigGroup}s. + */ @Inject - private org.apache.ambari.server.configuration.Configuration configs; + ConfigFactory configFactory; private static AmbariManagementController controller; private static ClusterController clusterController; @@ -458,11 +462,13 @@ public class AmbariContext { SortedSet<DesiredConfig> desiredConfigsOrderedByVersion = new TreeSet<>(new Comparator<DesiredConfig>() { @Override public int compare(DesiredConfig o1, DesiredConfig o2) { - if (o1.getVersion() < o2.getVersion()) + if (o1.getVersion() < o2.getVersion()) { return -1; + } - if (o1.getVersion() > o2.getVersion()) + if (o1.getVersion() > o2.getVersion()) { return 1; + } return 0; } @@ -473,9 +479,9 @@ public class AmbariContext { int tagMatchState = 0; // 0 -> INITIAL -> tagMatchState = 1 -> TOPLOGY_RESOLVED -> tagMatchState = 2 for (DesiredConfig config: desiredConfigsOrderedByVersion) { - if (config.getTag().equals(TopologyManager.INITIAL_CONFIG_TAG) && tagMatchState == 0) + if (config.getTag().equals(TopologyManager.INITIAL_CONFIG_TAG) && tagMatchState == 0) { tagMatchState = 1; - else if (config.getTag().equals(TopologyManager.TOPOLOGY_RESOLVED_TAG) && tagMatchState == 1) { + } else if (config.getTag().equals(TopologyManager.TOPOLOGY_RESOLVED_TAG) && tagMatchState == 1) { tagMatchState = 2; break; } @@ -551,7 +557,6 @@ public class AmbariContext { addedHost = true; if (! group.getHosts().containsKey(host.getHostId())) { group.addHost(host); - group.persistHostMapping(); } } catch (AmbariException e) { @@ -585,9 +590,7 @@ public class AmbariContext { for (Map.Entry<String, Map<String, String>> entry : userProvidedGroupProperties.entrySet()) { String type = entry.getKey(); String service = stack.getServiceForConfigType(type); - Config config = new ConfigImpl(type); - config.setTag(groupName); - config.setProperties(entry.getValue()); + Config config = configFactory.createReadOnly(type, groupName, entry.getValue(), null); //todo: attributes Map<String, Config> serviceConfigs = groupConfigs.get(service); if (serviceConfigs == null) { http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index d6a4bdd..d527b2d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -596,6 +596,14 @@ public class TopologyManager { return clusterTopologyMap.get(clusterId); } + /** + * Gets a map of components keyed by host which have operations in the + * {@link HostRoleStatus#PENDING} state. This could either be because hosts + * have not registered or becuase the operations are actually waiting to be + * queued. + * + * @return a mapping of host with pending components. + */ public Map<String, Collection<String>> getPendingHostComponents() { ensureInitialized(); Map<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>(); @@ -603,7 +611,16 @@ public class TopologyManager { for (LogicalRequest logicalRequest : allRequests.values()) { Map<Long, HostRoleCommandStatusSummaryDTO> summary = logicalRequest.getStageSummaries(); final CalculatedStatus status = CalculatedStatus.statusFromStageSummary(summary, summary.keySet()); - if (status.getStatus().isInProgress()) { + + // either use the calculated status of the stage or the fact that there + // are no tasks and the request has no end time to determine if the + // request is still in progress + boolean logicalRequestInProgress = false; + if (status.getStatus().isInProgress() || (summary.isEmpty() && logicalRequest.getEndTime() <= 0) ) { + logicalRequestInProgress = true; + } + + if (logicalRequestInProgress) { Map<String, Collection<String>> requestTopology = logicalRequest.getProjectedTopology(); for (Map.Entry<String, Collection<String>> entry : requestTopology.entrySet()) { String host = entry.getKey(); http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/update/HostUpdateHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/update/HostUpdateHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/update/HostUpdateHelper.java index 6a8057c..4c1ef5a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/update/HostUpdateHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/update/HostUpdateHelper.java @@ -53,8 +53,8 @@ import org.apache.ambari.server.orm.entities.TopologyRequestEntity; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; +import org.apache.ambari.server.state.ConfigFactory; import org.apache.ambari.server.state.ConfigHelper; -import org.apache.ambari.server.state.ConfigImpl; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.utils.EventBusSynchronizer; import org.apache.commons.lang.StringUtils; @@ -234,12 +234,12 @@ public class HostUpdateHelper { boolean configUpdated; // going through all cluster configs and update property values + ConfigFactory configFactory = injector.getInstance(ConfigFactory.class); for (ClusterConfigEntity clusterConfigEntity : clusterConfigEntities) { - ConfigImpl config = new ConfigImpl(cluster, clusterConfigEntity, injector); + Config config = configFactory.createExisting(cluster, clusterConfigEntity); configUpdated = false; for (Map.Entry<String,String> property : config.getProperties().entrySet()) { - updatedPropertyValue = replaceHosts(property.getValue(), currentHostNames, hostMapping); if (updatedPropertyValue != null) { @@ -249,8 +249,9 @@ public class HostUpdateHelper { configUpdated = true; } } + if (configUpdated) { - config.persist(false); + config.save(); } } } @@ -317,6 +318,7 @@ public class HostUpdateHelper { * */ public class StringComparator implements Comparator<String> { + @Override public int compare(String s1, String s2) { return s2.length() - s1.length(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java index 1038766..dbf45c3 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java @@ -17,6 +17,11 @@ */ package org.apache.ambari.server.upgrade; +import javax.persistence.EntityManager; +import javax.xml.bind.JAXBException; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + import java.io.File; import java.io.FileReader; import java.io.FilenameFilter; @@ -42,11 +47,6 @@ import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.persistence.EntityManager; -import javax.xml.bind.JAXBException; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; - import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.Configuration; @@ -447,11 +447,16 @@ public abstract class AbstractUpgradeCatalog implements UpgradeCatalog { Config hdfsSiteConfig = cluster.getDesiredConfigByType(CONFIGURATION_TYPE_HDFS_SITE); if (hdfsSiteConfig != null) { Map<String, String> properties = hdfsSiteConfig.getProperties(); + if (properties.containsKey("dfs.internal.nameservices")) { + return true; + } String nameServices = properties.get(PROPERTY_DFS_NAMESERVICES); if (!StringUtils.isEmpty(nameServices)) { - String namenodes = properties.get(String.format("dfs.ha.namenodes.%s", nameServices)); - if (!StringUtils.isEmpty(namenodes)) { - return (namenodes.split(",").length > 1); + for (String nameService : nameServices.split(",")) { + String namenodes = properties.get(String.format("dfs.ha.namenodes.%s", nameService)); + if (!StringUtils.isEmpty(namenodes)) { + return (namenodes.split(",").length > 1); + } } } } @@ -580,8 +585,8 @@ public abstract class AbstractUpgradeCatalog implements UpgradeCatalog { } if (!Maps.difference(oldConfigProperties, mergedProperties).areEqual()) { - LOG.info("Applying configuration with tag '{}' to " + - "cluster '{}'", newTag, cluster.getClusterName()); + LOG.info("Applying configuration with tag '{}' and configType '{}' to " + + "cluster '{}'", newTag, configType, cluster.getClusterName()); Map<String, Map<String, String>> propertiesAttributes = null; if (oldConfig != null) { http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog211.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog211.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog211.java index db13612..eb835ef 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog211.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog211.java @@ -212,8 +212,8 @@ public class UpgradeCatalog211 extends AbstractUpgradeCatalog { statement = dbAccessor.getConnection().createStatement(); if (statement != null) { String selectSQL = MessageFormat.format( - "SELECT cluster_id, service_name, component_name, host_id FROM {0}", - HOST_COMPONENT_STATE_TABLE); + "SELECT id, cluster_id, service_name, component_name, host_id FROM {0} ORDER BY {1} {2}", + HOST_COMPONENT_STATE_TABLE, "id", "DESC"); resultSet = statement.executeQuery(selectSQL); while (resultSet.next()) { @@ -221,13 +221,19 @@ public class UpgradeCatalog211 extends AbstractUpgradeCatalog { final String serviceName = resultSet.getString("service_name"); final String componentName = resultSet.getString("component_name"); final Long hostId = resultSet.getLong("host_id"); - - String updateSQL = MessageFormat.format( - "UPDATE {0} SET {1} = {2,number,#} WHERE cluster_id = {3} AND service_name = ''{4}'' AND component_name = ''{5}'' and host_id = {6,number,#}", - HOST_COMPONENT_STATE_TABLE, HOST_COMPONENT_STATE_ID_COLUMN, m_hcsId.getAndIncrement(), - clusterId, serviceName, componentName, hostId); - - dbAccessor.executeQuery(updateSQL); + final Long idKey = resultSet.getLong("id"); + + if (idKey != 0 && m_hcsId.get() == 1) { + m_hcsId.set(idKey); + m_hcsId.getAndIncrement(); + } else if(idKey == 0) { + String updateSQL = MessageFormat.format( + "UPDATE {0} SET {1} = {2,number,#} WHERE cluster_id = {3} AND service_name = ''{4}'' AND component_name = ''{5}'' and host_id = {6,number,#}", + HOST_COMPONENT_STATE_TABLE, HOST_COMPONENT_STATE_ID_COLUMN, m_hcsId.getAndIncrement(), + clusterId, serviceName, componentName, hostId); + + dbAccessor.executeQuery(updateSQL); + } } } } finally { http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java index 25b6360..063c295 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java @@ -1553,7 +1553,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { * @throws SQLException */ @Transactional - private void updateServiceComponentDesiredStateTableDDL() throws SQLException { + void updateServiceComponentDesiredStateTableDDL() throws SQLException { if (dbAccessor.tableHasPrimaryKey(SERVICE_COMPONENT_DS_TABLE, ID)) { LOG.info("Skipping {} table Primary Key modifications since the new {} column already exists", SERVICE_COMPONENT_DS_TABLE, ID); @@ -2755,7 +2755,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { * instead of cluster_name */ @Transactional - private void updateViewInstanceTable() throws SQLException { + void updateViewInstanceTable() throws SQLException { try { if (Long.class.equals(dbAccessor.getColumnClass(VIEWINSTANCE_TABLE, CLUSTER_HANDLE_COLUMN))) { LOG.info(String.format("%s column is already numeric. Skipping an update of %s table.", CLUSTER_HANDLE_COLUMN, VIEWINSTANCE_TABLE)); http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java index 1f44e28..e50b645 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java @@ -19,14 +19,6 @@ package org.apache.ambari.server.upgrade; import com.google.inject.Inject; import com.google.inject.Injector; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.CommandExecutionType; import org.apache.ambari.server.controller.AmbariManagementController; @@ -40,6 +32,14 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + /** * Upgrade catalog for version 2.5.0. */ @@ -137,6 +137,8 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog { updateHIVEInteractiveConfigs(); updateTEZInteractiveConfigs(); updateHiveLlapConfigs(); + updateTablesForZeppelinViewRemoval(); + updateAtlasConfigs(); } protected void updateHostVersionTable() throws SQLException { @@ -178,6 +180,11 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog { } } + protected void updateTablesForZeppelinViewRemoval() throws SQLException { + dbAccessor.executeQuery("DELETE from viewinstance WHERE view_name='ZEPPELIN{1.0.0}'", true); + dbAccessor.executeQuery("DELETE from viewmain WHERE view_name='ZEPPELIN{1.0.0}'", true); + dbAccessor.executeQuery("DELETE from viewparameter WHERE view_name='ZEPPELIN{1.0.0}'", true); + } protected String updateAmsEnvContent(String content) { if (content == null) { @@ -374,5 +381,33 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog { } } } -} + protected void updateAtlasConfigs() throws AmbariException { + AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); + Clusters clusters = ambariManagementController.getClusters(); + if (clusters != null) { + Map<String, Cluster> clusterMap = clusters.getClusters(); + if (clusterMap != null && !clusterMap.isEmpty()) { + for (final Cluster cluster : clusterMap.values()) { + updateAtlasHookConfig(cluster, "HIVE", "hive-env", "hive.atlas.hook"); + updateAtlasHookConfig(cluster, "STORM", "storm-env", "storm.atlas.hook"); + updateAtlasHookConfig(cluster, "FALCON", "falcon-env", "falcon.atlas.hook"); + updateAtlasHookConfig(cluster, "SQOOP", "sqoop-env", "sqoop.atlas.hook"); + } + } + } + } + + protected void updateAtlasHookConfig(Cluster cluster, String serviceName, String configType, String propertyName) throws AmbariException { + Set<String> installedServices = cluster.getServices().keySet(); + if (installedServices.contains("ATLAS") && installedServices.contains(serviceName)) { + Config configEnv = cluster.getDesiredConfigByType(configType); + if (configEnv != null) { + Map<String, String> newProperties = new HashMap<>(); + newProperties.put(propertyName, "true"); + boolean updateProperty = configEnv.getProperties().containsKey(propertyName); + updateConfigurationPropertiesForCluster(cluster, configType, newProperties, updateProperty, true); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/java/org/apache/ambari/server/utils/RequestUtils.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/RequestUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/RequestUtils.java index 0ac782f..dbb0f11 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/utils/RequestUtils.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/RequestUtils.java @@ -50,6 +50,9 @@ public class RequestUtils { if (isRemoteAddressUnknown(ip)) { ip = request.getRemoteAddr(); } + if (containsMultipleRemoteAddresses(ip)) { + ip = ip.substring(0, ip.indexOf(",")); + } return ip; } @@ -76,6 +79,13 @@ public class RequestUtils { } /** + * Checks if ip contains multiple IP addresses + */ + private static boolean containsMultipleRemoteAddresses(String ip) { + return ip != null && ip.indexOf(",") > 0; + } + + /** * Checks if RequestContextHolder contains a valid HTTP request * @return */ http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/python/ambari_server/serverConfiguration.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/ambari_server/serverConfiguration.py b/ambari-server/src/main/python/ambari_server/serverConfiguration.py index 2e4372b..2cec61d 100644 --- a/ambari-server/src/main/python/ambari_server/serverConfiguration.py +++ b/ambari-server/src/main/python/ambari_server/serverConfiguration.py @@ -186,7 +186,6 @@ SETUP_OR_UPGRADE_MSG = "- If this is a new setup, then run the \"ambari-server s DEFAULT_DB_NAME = "ambari" SECURITY_KEYS_DIR = "security.server.keys_dir" -DASHBOARD_PATH_PROPERTY = 'dashboards.path' EXTENSION_PATH_PROPERTY = 'extensions.path' COMMON_SERVICES_PATH_PROPERTY = 'common.services.path' MPACKS_STAGING_PATH_PROPERTY = 'mpacks.staging.path' @@ -398,8 +397,8 @@ class ServerConfigDefaults(object): self.EXTENSION_LOCATION_DEFAULT = "" self.COMMON_SERVICES_LOCATION_DEFAULT = "" self.MPACKS_STAGING_LOCATION_DEFAULT = "" - self.DASHBOARD_LOCATION_DEFAULT = "" self.SERVER_TMP_DIR_DEFAULT = "" + self.DASHBOARD_DIRNAME = "dashboards" self.DEFAULT_VIEWS_DIR = "" @@ -470,7 +469,6 @@ class ServerConfigDefaultsWindows(ServerConfigDefaults): self.EXTENSION_LOCATION_DEFAULT = "resources\\extensions" self.COMMON_SERVICES_LOCATION_DEFAULT = "resources\\common-services" self.MPACKS_STAGING_LOCATION_DEFAULT = "resources\\mpacks" - self.DASHBOARD_LOCATION_DEFAULT = "resources\\dashboards" self.SERVER_TMP_DIR_DEFAULT = "data\\tmp" self.DEFAULT_VIEWS_DIR = "resources\\views" @@ -557,7 +555,6 @@ class ServerConfigDefaultsLinux(ServerConfigDefaults): self.EXTENSION_LOCATION_DEFAULT = AmbariPath.get("/var/lib/ambari-server/resources/extensions") self.COMMON_SERVICES_LOCATION_DEFAULT = AmbariPath.get("/var/lib/ambari-server/resources/common-services") self.MPACKS_STAGING_LOCATION_DEFAULT = AmbariPath.get("/var/lib/ambari-server/resources/mpacks") - self.DASHBOARD_LOCATION_DEFAULT = AmbariPath.get("/var/lib/ambari-server/resources/dashboards") self.SERVER_TMP_DIR_DEFAULT = AmbariPath.get("/var/lib/ambari-server/data/tmp") self.DEFAULT_VIEWS_DIR = AmbariPath.get("/var/lib/ambari-server/resources/views") @@ -1438,13 +1435,8 @@ def get_mpacks_staging_location(properties): # Dashboard location # def get_dashboard_location(properties): - try: - dashboard_location = properties[DASHBOARD_PATH_PROPERTY] - except KeyError: - dashboard_location = configDefaults.DASHBOARD_LOCATION_DEFAULT - - if not dashboard_location: - dashboard_location = configDefaults.DASHBOARD_LOCATION_DEFAULT + resources_dir = get_resources_location(properties) + dashboard_location = os.path.join(resources_dir, configDefaults.DASHBOARD_DIRNAME) return dashboard_location # http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/python/ambari_server/serverUpgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/ambari_server/serverUpgrade.py b/ambari-server/src/main/python/ambari_server/serverUpgrade.py index 7d7a9ab..6f17900 100644 --- a/ambari-server/src/main/python/ambari_server/serverUpgrade.py +++ b/ambari-server/src/main/python/ambari_server/serverUpgrade.py @@ -163,7 +163,7 @@ def run_stack_upgrade(args, stackName, stackVersion, repo_url, repo_url_os): "updateStackId", "'" + json.dumps(stackId) + "'") (retcode, stdout, stderr) = run_os_command(command) - print_info_msg("Return code from stack upgrade command, retcode = " + str(retcode)) + print_info_msg("Return code from stack upgrade command, retcode = {0}".format(str(retcode))) if retcode > 0: print_error_msg("Error executing stack upgrade, please check the server logs.") return retcode @@ -182,10 +182,9 @@ def run_metainfo_upgrade(args, keyValueMap=None): 'updateMetaInfo', "'" + json.dumps(keyValueMap) + "'") (retcode, stdout, stderr) = run_os_command(command) - print_info_msg("Return code from stack upgrade command, retcode = " + str(retcode)) + print_info_msg("Return code from stack upgrade command, retcode = {0}".format(str(retcode))) if retcode > 0: - print_error_msg("Error executing metainfo upgrade, please check the " - "server logs.") + print_error_msg("Error executing metainfo upgrade, please check the server logs.") return retcode @@ -195,7 +194,7 @@ def run_metainfo_upgrade(args, keyValueMap=None): # def change_objects_owner(args): - print 'Fixing database objects owner' + print_info_msg('Fixing database objects owner', True) properties = Properties() #Dummy, args contains the dbms name and parameters already @@ -228,8 +227,8 @@ def upgrade_local_repo(args): repo_file = os.path.join(stack_root, stack_version_local, "repos", "repoinfo.xml") - print_info_msg("Local repo file: " + repo_file_local) - print_info_msg("Repo file: " + repo_file_local) + print_info_msg("Local repo file: {0}".format(repo_file_local)) + print_info_msg("Repo file: {0}".format(repo_file_local)) metainfo_update_items = {} @@ -267,7 +266,7 @@ def run_schema_upgrade(args): ensure_jdbc_driver_is_installed(args, get_ambari_properties()) - print 'Upgrading database schema' + print_info_msg('Upgrading database schema', True) serverClassPath = ServerClassPath(get_ambari_properties(), args) class_path = serverClassPath.get_full_ambari_classpath_escaped_for_shell(validate_classpath=True) @@ -284,19 +283,19 @@ def run_schema_upgrade(args): environ = generate_env(args, ambari_user, current_user) (retcode, stdout, stderr) = run_os_command(command, env=environ) - print_info_msg("Return code from schema upgrade command, retcode = " + str(retcode)) + print_info_msg("Return code from schema upgrade command, retcode = {0}".format(str(retcode)), True) if stdout: - print "Console output from schema upgrade command:" - print stdout - print - if stderr: - print "Error output from schema upgrade command:" - print stderr + print_info_msg("Console output from schema upgrade command:", True) + print_info_msg(stdout, True) print if retcode > 0: print_error_msg("Error executing schema upgrade, please check the server logs.") + if stderr: + print_error_msg("Error output from schema upgrade command:") + print_error_msg(stderr) + print else: - print_info_msg('Schema upgrade completed') + print_info_msg('Schema upgrade completed', True) return retcode @@ -335,16 +334,17 @@ def move_user_custom_actions(): raise FatalException(1, err) def upgrade(args): - logger.info("Upgrade ambari-server.") + print_info_msg("Upgrade Ambari Server", True) if not is_root(): err = configDefaults.MESSAGE_ERROR_UPGRADE_NOT_ROOT raise FatalException(4, err) - print 'Updating properties in ' + AMBARI_PROPERTIES_FILE + ' ...' + print_info_msg('Updating Ambari Server properties in {0} ...'.format(AMBARI_PROPERTIES_FILE), True) retcode = update_ambari_properties() if not retcode == 0: err = AMBARI_PROPERTIES_FILE + ' file can\'t be updated. Exiting' raise FatalException(retcode, err) + print_info_msg('Updating Ambari Server properties in {0} ...'.format(AMBARI_ENV_FILE), True) retcode = update_ambari_env() if not retcode == 0: err = AMBARI_ENV_FILE + ' file can\'t be updated. Exiting' @@ -354,7 +354,7 @@ def upgrade(args): if retcode == -2: pass # no changes done, let's be silent elif retcode == 0: - print 'File ' + AMBARI_KRB_JAAS_LOGIN_FILE + ' updated.' + print_info_msg("File {0} updated.".format(AMBARI_KRB_JAAS_LOGIN_FILE), True) elif not retcode == 0: err = AMBARI_KRB_JAAS_LOGIN_FILE + ' file can\'t be updated. Exiting' raise FatalException(retcode, err) http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/python/ambari_server/utils.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/ambari_server/utils.py b/ambari-server/src/main/python/ambari_server/utils.py index 62c93ae..4c8304b 100644 --- a/ambari-server/src/main/python/ambari_server/utils.py +++ b/ambari-server/src/main/python/ambari_server/utils.py @@ -125,17 +125,18 @@ def save_main_pid_ex(pids, pidfile, exclude_list=[], skip_daemonize=False): """ pid_saved = False try: - pfile = open(pidfile, "w") - for item in pids: - if pid_exists(item["pid"]) and (item["exe"] not in exclude_list): - pfile.write("%s\n" % item["pid"]) - pid_saved = True - logger.info("Ambari server started with PID " + str(item["pid"])) - if pid_exists(item["pid"]) and (item["exe"] in exclude_list) and not skip_daemonize: - try: - os.kill(int(item["pid"]), signal.SIGKILL) - except: - pass + if pids: + pfile = open(pidfile, "w") + for item in pids: + if pid_exists(item["pid"]) and (item["exe"] not in exclude_list): + pfile.write("%s\n" % item["pid"]) + pid_saved = True + logger.info("Ambari server started with PID " + str(item["pid"])) + if pid_exists(item["pid"]) and (item["exe"] in exclude_list) and not skip_daemonize: + try: + os.kill(int(item["pid"]), signal.SIGKILL) + except: + pass except IOError as e: logger.error("Failed to write PID to " + pidfile + " due to " + str(e)) pass http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/python/ambari_server_main.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/ambari_server_main.py b/ambari-server/src/main/python/ambari_server_main.py index 572de4e..6c77522 100644 --- a/ambari-server/src/main/python/ambari_server_main.py +++ b/ambari-server/src/main/python/ambari_server_main.py @@ -28,6 +28,7 @@ from ambari_commons.logging_utils import get_debug_mode, print_warning_msg, prin from ambari_commons.os_check import OSConst from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl from ambari_commons.os_utils import is_root, run_os_command +from ambari_server.ambariPath import AmbariPath from ambari_server.dbConfiguration import ensure_dbms_is_running, ensure_jdbc_driver_is_installed from ambari_server.serverConfiguration import configDefaults, find_jdk, get_ambari_properties, \ get_conf_dir, get_is_persisted, get_is_secure, get_java_exe_path, get_original_master_key, read_ambari_user, \ @@ -120,6 +121,8 @@ AMBARI_SERVER_NOT_STARTED_MSG = "Ambari Server java process hasn't been started ULIMIT_OPEN_FILES_KEY = 'ulimit.open.files' ULIMIT_OPEN_FILES_DEFAULT = 10000 +AMBARI_ENV_FILE = AmbariPath.get("/var/lib/ambari-server/ambari-env.sh") + @OsFamilyFuncImpl(OSConst.WINSRV_FAMILY) def ensure_server_security_is_configured(): pass @@ -178,9 +181,10 @@ def generate_child_process_param_list(ambari_user, java_exe, class_path, # from subprocess, we have to skip --login option of su command. That's why # we change dir to / (otherwise subprocess can face with 'permission denied' # errors while trying to list current directory - cmd = "{ulimit_cmd} ; {su} {ambari_user} -s {sh_shell} -c '{command}'".format(ulimit_cmd=ulimit_cmd, + cmd = "{ulimit_cmd} ; {su} {ambari_user} -s {sh_shell} -c 'source {ambari_env_file} ; {command}'".format(ulimit_cmd=ulimit_cmd, su=locate_file('su', '/bin'), ambari_user=ambari_user, - sh_shell=locate_file('sh', '/bin'), command=command) + sh_shell=locate_file('sh', '/bin'), command=command, + ambari_env_file=AMBARI_ENV_FILE) else: cmd = "{ulimit_cmd} ; {command}".format(ulimit_cmd=ulimit_cmd, command=command) http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql index b01ed2f..82ce31e 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql @@ -26,7 +26,16 @@ delimiter ; # USE @schema; -SET default_storage_engine=INNODB; +-- Set default_storage_engine to InnoDB +-- storage_engine variable should be used for versions prior to MySQL 5.6 +set @version_short = substring_index(@@version, '.', 2); +set @major = cast(substring_index(@version_short, '.', 1) as SIGNED); +set @minor = cast(substring_index(@version_short, '.', -1) as SIGNED); +set @engine_stmt = IF(@major >= 5 AND @minor>=6, 'SET default_storage_engine=INNODB', 'SET storage_engine=INNODB'); +prepare statement from @engine_stmt; +execute statement; +DEALLOCATE PREPARE statement; + CREATE TABLE stack( stack_id BIGINT NOT NULL, http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py index e00c1f5..53cf002 100644 --- a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py @@ -122,7 +122,7 @@ info_num_logs = config['configurations']['accumulo-log4j']['info_num_logs'] # metrics2 properties ganglia_server_hosts = default('/clusterHostInfo/ganglia_server_host', []) # is not passed when ganglia is not present ganglia_server_host = '' if len(ganglia_server_hosts) == 0 else ganglia_server_hosts[0] -ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", []) +ams_collector_hosts = ",".join(default("/clusterHostInfo/metrics_collector_hosts", [])) has_metric_collector = not len(ams_collector_hosts) == 0 if has_metric_collector: if 'cluster-env' in config['configurations'] and \ http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/resources/common-services/AMBARI_INFRA/0.1.0/metainfo.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_INFRA/0.1.0/metainfo.xml index 8e1051b..b94812b 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA/0.1.0/metainfo.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA/0.1.0/metainfo.xml @@ -34,6 +34,7 @@ <commandScript> <script>scripts/infra_solr.py</script> <scriptType>PYTHON</scriptType> + <timeout>1800</timeout> </commandScript> <logs> <log> http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/resources/common-services/AMBARI_INFRA/0.1.0/package/scripts/setup_infra_solr.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA/0.1.0/package/scripts/setup_infra_solr.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA/0.1.0/package/scripts/setup_infra_solr.py index 41cb504..5143cbb 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA/0.1.0/package/scripts/setup_infra_solr.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA/0.1.0/package/scripts/setup_infra_solr.py @@ -112,4 +112,5 @@ def create_ambari_solr_znode(): solr_cloud_util.create_znode( zookeeper_quorum=params.zookeeper_quorum, solr_znode=params.infra_solr_znode, - java64_home=params.java64_home) \ No newline at end of file + java64_home=params.java64_home, + retry=30, interval=5) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml index 6651b6c..81fa4c7 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml @@ -82,7 +82,7 @@ </property> <property> <name>failover_strategy_blacklisted_interval</name> - <value>600</value> + <value>300</value> <description> Metrics collector host will be blacklisted for specified number of seconds if metric monitor failed to connect to it. </description> http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml index b7008cf..76f8660 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml @@ -716,4 +716,16 @@ </description> <on-ambari-upgrade add="true"/> </property> + <property> + <name>cluster.zookeeper.quorum</name> + <value>{{cluster_zookeeper_quorum_hosts}}</value> + <description>Comma separated list of servers in the cluster ZooKeeper Quorum. + </description> + <on-ambari-upgrade add="true"/> + </property> + <property> + <name>cluster.zookeeper.property.clientPort</name> + <value>{{cluster_zookeeper_clientPort}}</value> + <on-ambari-upgrade add="true"/> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml index 43a4320..740a91a 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml @@ -101,6 +101,7 @@ <category>MASTER</category> <cardinality>0-1</cardinality> <versionAdvertised>false</versionAdvertised> + <customFolder>dashboards</customFolder> <commandScript> <script>scripts/metrics_grafana.py</script> <scriptType>PYTHON</scriptType> http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py index feabdb2..d9fd9f6 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py @@ -364,6 +364,7 @@ def ams(name=None, action=None): Directory(params.ams_monitor_pid_dir, owner=params.ams_user, group=params.user_group, + cd_access="a", mode=0755, create_parents = True ) http://git-wip-us.apache.org/repos/asf/ambari/blob/4278c4a4/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py index 214c1e8..99df380 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py @@ -38,6 +38,8 @@ class AmsCollector(Script): def configure(self, env, action = None): import params env.set_params(params) + if action == 'start' and params.embedded_mode_multiple_instances: + raise Fail("AMS in embedded mode cannot have more than 1 instance. Delete all but 1 instances or switch to Distributed mode ") hbase('master', action) hbase('regionserver', action) ams(name='collector')