Repository: ambari Updated Branches: refs/heads/trunk 65724a9b9 -> 75555dc56
AMBARI-14720. Exporting Blueprint doesn't have some configs. (Daniel Gergely via stoader) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/75555dc5 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/75555dc5 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/75555dc5 Branch: refs/heads/trunk Commit: 75555dc56810bc296294d44623ace1bac598511a Parents: 65724a9 Author: Toader, Sebastian <stoa...@hortonworks.com> Authored: Fri Mar 4 09:23:52 2016 +0100 Committer: Toader, Sebastian <stoa...@hortonworks.com> Committed: Fri Mar 4 09:23:52 2016 +0100 ---------------------------------------------------------------------- .../BlueprintConfigurationProcessor.java | 66 ++++++++++++++++-- .../BlueprintConfigurationProcessorTest.java | 72 ++++++++++++++++++++ 2 files changed, 134 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/75555dc5/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java index 2d9a851..f5e7578 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java @@ -101,6 +101,12 @@ public class BlueprintConfigurationProcessor { new HashMap<String, Map<String, PropertyUpdater>>(); /** + * Non topology related updaters + */ + private static Map<String, Map<String, PropertyUpdater>> nonTopologyUpdaters = + new HashMap<String, Map<String, PropertyUpdater>>(); + + /** * Updaters that preserve the original property value, functions * as a placeholder for DB-related properties that need to be * removed from export, but do not require an update during @@ -360,6 +366,8 @@ public class BlueprintConfigurationProcessor { doMultiHostExportUpdate(multiHostTopologyUpdaters, configuration); + doNonTopologyUpdate(nonTopologyUpdaters, configuration); + doRemovePropertyExport(removePropertyUpdaters, configuration); doFilterPriorToExport(configuration); @@ -1109,6 +1117,28 @@ public class BlueprintConfigurationProcessor { } /** + * Update non topology related configuration properties for blueprint export. + * + * @param updaters registered non topology updaters + * @param configuration configuration being processed + */ + private void doNonTopologyUpdate(Map<String, Map<String, PropertyUpdater>> updaters, Configuration configuration) { + Map<String, Map<String, String>> properties = configuration.getFullProperties(); + for (Map.Entry<String, Map<String, PropertyUpdater>> entry : updaters.entrySet()) { + String type = entry.getKey(); + for (String propertyName : entry.getValue().keySet()) { + NonTopologyUpdater npu = (NonTopologyUpdater) entry.getValue().get(propertyName); + Map<String, String> typeProperties = properties.get(type); + + if (typeProperties != null && typeProperties.containsKey(propertyName)) { + String newValue = npu.updateForBlueprintExport(propertyName, typeProperties.get(propertyName), properties, clusterTopology); + configuration.setProperty(type, propertyName, newValue); + } + } + } + } + + /** * Provides functionality to update a property value. */ public interface PropertyUpdater { @@ -2137,6 +2167,13 @@ public class BlueprintConfigurationProcessor { ClusterTopology topology) { return Collections.emptyList(); } + + public String updateForBlueprintExport(String propertyName, + String origValue, + Map<String, Map<String, String>> properties, + ClusterTopology topology) { + return origValue; + } } @@ -2149,6 +2186,7 @@ public class BlueprintConfigurationProcessor { allUpdaters.add(multiHostTopologyUpdaters); allUpdaters.add(dbHostTopologyUpdaters); allUpdaters.add(mPropertyUpdaters); + allUpdaters.add(nonTopologyUpdaters); Map<String, PropertyUpdater> hdfsSiteMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> mapredSiteMap = new HashMap<String, PropertyUpdater>(); @@ -2156,12 +2194,15 @@ public class BlueprintConfigurationProcessor { Map<String, PropertyUpdater> hbaseSiteMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> yarnSiteMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> hiveSiteMap = new HashMap<String, PropertyUpdater>(); + Map<String, PropertyUpdater> hiveSiteNonTopologyMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> oozieSiteOriginalValueMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> oozieSiteMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> stormSiteMap = new HashMap<String, PropertyUpdater>(); + Map<String, PropertyUpdater> stormSiteNonTopologyMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> accumuloSiteMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> falconStartupPropertiesMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> kafkaBrokerMap = new HashMap<String, PropertyUpdater>(); + Map<String, PropertyUpdater> kafkaBrokerNonTopologyMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> atlasPropsMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> mapredEnvMap = new HashMap<String, PropertyUpdater>(); Map<String, PropertyUpdater> hadoopEnvMap = new HashMap<String, PropertyUpdater>(); @@ -2229,6 +2270,10 @@ public class BlueprintConfigurationProcessor { removePropertyUpdaters.put("oozie-env", oozieEnvOriginalValueMap); removePropertyUpdaters.put("oozie-site", oozieSiteOriginalValueMap); + nonTopologyUpdaters.put("hive-site", hiveSiteNonTopologyMap); + nonTopologyUpdaters.put("kafka-broker", kafkaBrokerNonTopologyMap); + nonTopologyUpdaters.put("storm-site", stormSiteNonTopologyMap); + //todo: Need to change updaters back to being static //todo: will need to pass ClusterTopology in as necessary @@ -2293,7 +2338,7 @@ public class BlueprintConfigurationProcessor { multiHiveSiteMap.put("hive.cluster.delegation.token.store.zookeeper.connectString", new MultipleHostTopologyUpdater("ZOOKEEPER_SERVER")); // HIVE Atlas integration - hiveSiteMap.put("hive.exec.post.hooks", new NonTopologyUpdater() { + hiveSiteNonTopologyMap.put("hive.exec.post.hooks", new NonTopologyUpdater() { @Override public String updateForClusterCreate(String propertyName, String origValue, @@ -2314,7 +2359,7 @@ public class BlueprintConfigurationProcessor { }); //todo: john - this property should be moved to atlas configuration - hiveSiteMap.put("atlas.cluster.name", new NonTopologyUpdater() { + hiveSiteNonTopologyMap.put("atlas.cluster.name", new NonTopologyUpdater() { @Override public String updateForClusterCreate(String propertyName, String origValue, @@ -2334,6 +2379,19 @@ public class BlueprintConfigurationProcessor { return origValue; } } + + @Override + public String updateForBlueprintExport(String propertyName, + String origValue, + Map<String, Map<String, String>> properties, + ClusterTopology topology) { + + // if the value is the cluster id, then update to primary + if (origValue.equals(String.valueOf(topology.getClusterId()))) { + return "primary"; + } + return origValue; + } }); //todo: john - this property should be removed @@ -2387,7 +2445,7 @@ public class BlueprintConfigurationProcessor { stormSiteMap.put("supervisor.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER")); stormSiteMap.put("nimbus.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER")); // Storm AMS integration - stormSiteMap.put("metrics.reporter.register", new NonTopologyUpdater() { + stormSiteNonTopologyMap.put("metrics.reporter.register", new NonTopologyUpdater() { @Override public String updateForClusterCreate(String propertyName, String origValue, @@ -2420,7 +2478,7 @@ public class BlueprintConfigurationProcessor { // KAFKA kafkaBrokerMap.put("kafka.ganglia.metrics.host", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER")); // KAFKA AMS integration - kafkaBrokerMap.put("kafka.metrics.reporters", new NonTopologyUpdater() { + kafkaBrokerNonTopologyMap.put("kafka.metrics.reporters", new NonTopologyUpdater() { @Override public String updateForClusterCreate(String propertyName, String origValue, http://git-wip-us.apache.org/repos/asf/ambari/blob/75555dc5/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java index 30351d8..68d5755 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java @@ -6662,6 +6662,78 @@ public class BlueprintConfigurationProcessorTest { assertEquals(createHostAddress(expectedHostNameNamenode, expectedPortNamenode) + "/hawq_default", hawqSite.get("hawq_dfs_url")); } + @Test + public void testDoUpdateForBlueprintExport_NonTopologyProperty__AtlasClusterName() throws Exception { + Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>(); + + Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap()); + + Collection<String> hgComponents = new HashSet<String>(); + hgComponents.add("ATLAS_SERVER"); + TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost")); + + Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>(); + hostGroups.add(group1); + + ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups); + Long clusterId = topology.getClusterId(); + Map<String, String> typeProps = new HashMap<String, String>(); + typeProps.put("atlas.cluster.name", String.valueOf(clusterId)); + properties.put("hive-site", typeProps); + + BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology); + configProcessor.doUpdateForBlueprintExport(); + + String updatedVal = properties.get("hive-site").get("atlas.cluster.name"); + assertEquals("primary", updatedVal); + } + + @Test + public void testDoUpdateForBlueprintExport_NonTopologyProperty() throws Exception { + String someString = "String.To.Represent.A.String.Value"; + Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>(); + + Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap()); + + Collection<String> hgComponents = new HashSet<String>(); + hgComponents.add("ATLAS_SERVER"); + hgComponents.add("HIVE_SERVER"); + hgComponents.add("KAFKA_BROKER"); + hgComponents.add("NIMBUS"); + TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost")); + + Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>(); + hostGroups.add(group1); + + ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups); + Long clusterId = topology.getClusterId(); + + Map<String, String> hiveSiteProps = new HashMap<String, String>(); + hiveSiteProps.put("atlas.cluster.name", String.valueOf(clusterId)); + hiveSiteProps.put("hive.exec.post.hooks", someString); + properties.put("hive-site", hiveSiteProps); + + Map<String, String> kafkaBrokerProps = new HashMap<String, String>(); + kafkaBrokerProps.put("kafka.metrics.reporters", someString); + properties.put("kafka-broker", kafkaBrokerProps); + + Map<String, String> stormSiteProps = new HashMap<String, String>(); + stormSiteProps.put("metrics.reporter.register", someString); + properties.put("storm-site", stormSiteProps); + + BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology); + configProcessor.doUpdateForBlueprintExport(); + + String atlasClusterName = properties.get("hive-site").get("atlas.cluster.name"); + String hiveExecPostHooks = properties.get("hive-site").get("hive.exec.post.hooks"); + String kafkaMetricsReporters = properties.get("kafka-broker").get("kafka.metrics.reporters"); + String metricsReporterRegister = properties.get("storm-site").get("metrics.reporter.register"); + assertEquals("primary", atlasClusterName); + assertEquals(someString, hiveExecPostHooks); + assertEquals(someString, kafkaMetricsReporters); + assertEquals(someString, metricsReporterRegister); + } + private Map<String, AdvisedConfiguration> createAdvisedConfigMap() { Map<String, AdvisedConfiguration> advMap = new HashMap<String, AdvisedConfiguration>();