Repository: ambari Updated Branches: refs/heads/trunk df3fca0fb -> c8e7c98fb
AMBARI-11087. Add Blueprint Support for Storm Nimbus High Availability. (rnettleton) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c8e7c98f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c8e7c98f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c8e7c98f Branch: refs/heads/trunk Commit: c8e7c98fb3a18948cbee1d14df46d345b1890dbb Parents: df3fca0 Author: Bob Nettleton <rnettle...@hortonworks.com> Authored: Wed May 13 12:33:08 2015 -0400 Committer: Bob Nettleton <rnettle...@hortonworks.com> Committed: Wed May 13 12:33:08 2015 -0400 ---------------------------------------------------------------------- .../BlueprintConfigurationProcessor.java | 47 +++++++- .../BlueprintConfigurationProcessorTest.java | 120 +++++++++++++++++++ 2 files changed, 165 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c8e7c98f/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java index 7938cc1..aef6664 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java @@ -1312,6 +1312,11 @@ public class BlueprintConfigurationProcessor { Map<String, Map<String, String>> properties, ClusterTopology topology) { + // return customer-supplied properties without updating them + if (isFQDNValue(origValue)) { + return origValue; + } + return doFormat(propertyUpdater.updateForClusterCreate(propertyName, origValue, properties, topology)); } @@ -1330,6 +1335,19 @@ public class BlueprintConfigurationProcessor { return propertyUpdater.getRequiredHostGroups(origValue, properties, topology); } + + /** + * Convenience method to determine if a property value is a + * customer-specified FQDN. + * + * @param value property value to examine + * @return true if the property represents an FQDN value + * false if the property does not represent an FQDN value + */ + public boolean isFQDNValue(String value) { + return !value.contains("%HOSTGROUP") && + !value.contains("localhost"); + } } /** @@ -1337,8 +1355,22 @@ public class BlueprintConfigurationProcessor { */ private static class YamlMultiValuePropertyDecorator extends AbstractPropertyValueDecorator { + // currently, only plain and single-quoted Yaml flows are supported by this updater + enum FlowStyle { + SINGLE_QUOTED, + PLAIN + } + + private final FlowStyle flowStyle; + public YamlMultiValuePropertyDecorator(PropertyUpdater propertyUpdater) { + // single-quote style is considered default by this updater + this(propertyUpdater, FlowStyle.SINGLE_QUOTED); + } + + protected YamlMultiValuePropertyDecorator(PropertyUpdater propertyUpdater, FlowStyle flowStyle) { super(propertyUpdater); + this.flowStyle = flowStyle; } /** @@ -1360,9 +1392,17 @@ public class BlueprintConfigurationProcessor { } else { isFirst = false; } - sb.append("'"); + + if (flowStyle == FlowStyle.SINGLE_QUOTED) { + sb.append("'"); + } + sb.append(value); - sb.append("'"); + + if (flowStyle == FlowStyle.SINGLE_QUOTED) { + sb.append("'"); + } + } sb.append("]"); } @@ -1649,6 +1689,9 @@ public class BlueprintConfigurationProcessor { stormSiteMap.put("nimbus.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER")); multiStormSiteMap.put("storm.zookeeper.servers", new YamlMultiValuePropertyDecorator(new MultipleHostTopologyUpdater("ZOOKEEPER_SERVER"))); + multiStormSiteMap.put("nimbus.seeds", + new YamlMultiValuePropertyDecorator(new MultipleHostTopologyUpdater("NIMBUS"), YamlMultiValuePropertyDecorator.FlowStyle.PLAIN)); + // FALCON falconStartupPropertiesMap.put("*.broker.url", new SingleHostTopologyUpdater("FALCON_SERVER")); http://git-wip-us.apache.org/repos/asf/ambari/blob/c8e7c98f/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java index e7b0c64..390f73e 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java @@ -2315,6 +2315,93 @@ public class BlueprintConfigurationProcessorTest { } @Test + public void testDoUpdateForClusterCreate_Storm_Nimbus_HA_Enabled__defaultValues_YAML() throws Exception { + Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>(); + Map<String, String> typeProps = new HashMap<String, String>(); + typeProps.put("nimbus.seeds", "localhost"); + properties.put("storm-site", typeProps); + + Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap()); + + Collection<String> hgComponents = new HashSet<String>(); + hgComponents.add("NIMBUS"); + + TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost")); + + Collection<String> hgComponents2 = new HashSet<String>(); + hgComponents2.add("NIMBUS"); + + Set<String> hosts2 = new HashSet<String>(); + hosts2.add("testhost2"); + + TestHostGroup group2 = new TestHostGroup("group2", hgComponents2, hosts2); + + Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>(); + hostGroups.add(group1); + hostGroups.add(group2); + + ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups); + BlueprintConfigurationProcessor updater = new BlueprintConfigurationProcessor(topology); + + updater.doUpdateForClusterCreate(); + String updatedVal = topology.getConfiguration().getFullProperties().get("storm-site").get("nimbus.seeds"); + assertTrue("Updated YAML value should start with bracket", updatedVal.startsWith("[")); + assertTrue("Updated YAML value should end with bracket", updatedVal.endsWith("]")); + // remove the surrounding brackets + updatedVal = updatedVal.replaceAll("[\\[\\]]", ""); + + String[] hosts = updatedVal.split(","); + + Collection<String> expectedHosts = new HashSet<String>(); + expectedHosts.add("testhost"); + expectedHosts.add("testhost2"); + + assertEquals("Incorrect number of hosts found in updated Nimbus config property", 2, hosts.length); + for (String host : hosts) { + assertTrue("Expected host name = " + host + " not found in updated Nimbus config property", expectedHosts.contains(host)); + expectedHosts.remove(host); + } + } + + @Test + public void testDoUpdateForClusterCreate_Storm_Nimbus_HA_Enabled__FQDN_ValuesSpecified_YAML() throws Exception { + final String expectedValue = "[c6401.ambari.apache.org, c6402.ambari.apache.org]"; + Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>(); + Map<String, String> typeProps = new HashMap<String, String>(); + typeProps.put("nimbus.seeds", expectedValue); + properties.put("storm-site", typeProps); + + Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap()); + + Collection<String> hgComponents = new HashSet<String>(); + hgComponents.add("NIMBUS"); + + TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost")); + + Collection<String> hgComponents2 = new HashSet<String>(); + hgComponents2.add("NIMBUS"); + + Set<String> hosts2 = new HashSet<String>(); + hosts2.add("testhost2"); + + TestHostGroup group2 = new TestHostGroup("group2", hgComponents2, hosts2); + + Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>(); + hostGroups.add(group1); + hostGroups.add(group2); + + ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups); + BlueprintConfigurationProcessor updater = new BlueprintConfigurationProcessor(topology); + + updater.doUpdateForClusterCreate(); + String updatedVal = topology.getConfiguration().getFullProperties().get("storm-site").get("nimbus.seeds"); + + assertEquals("nimbus.seeds property should not be updated when FQDNs are specified in configuration", + expectedValue, updatedVal); + } + + + @Test public void testDoUpdateForClusterCreate_MProperty__defaultValues() throws Exception { Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>(); Map<String, String> typeProps = new HashMap<String, String>(); @@ -2714,6 +2801,7 @@ public class BlueprintConfigurationProcessorTest { Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>(); Map<String, String> typeProps = new HashMap<String, String>(); typeProps.put("storm.zookeeper.servers", "['%HOSTGROUP::group1%:9090','%HOSTGROUP::group2%:9091']"); + typeProps.put("nimbus.seeds", "[%HOSTGROUP::group1%, %HOSTGROUP::group4%]"); properties.put("storm-site", typeProps); Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap()); @@ -2722,12 +2810,14 @@ public class BlueprintConfigurationProcessorTest { hgComponents.add("NAMENODE"); hgComponents.add("SECONDARY_NAMENODE"); hgComponents.add("ZOOKEEPER_SERVER"); + hgComponents.add("NIMBUS"); TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost")); Collection<String> hgComponents2 = new HashSet<String>(); hgComponents2.add("DATANODE"); hgComponents2.add("HDFS_CLIENT"); hgComponents2.add("ZOOKEEPER_SERVER"); + hgComponents2.add("NIMBUS"); Set<String> hosts2 = new HashSet<String>(); hosts2.add("testhost2"); hosts2.add("testhost2a"); @@ -2742,10 +2832,17 @@ public class BlueprintConfigurationProcessorTest { hosts3.add("testhost3a"); TestHostGroup group3 = new TestHostGroup("group3", hgComponents3, hosts3); + Collection<String> hgComponents4 = new HashSet<String>(); + hgComponents4.add("NIMBUS"); + Set<String> hosts4 = new HashSet<String>(); + hosts4.add("testhost4"); + TestHostGroup group4 = new TestHostGroup("group4", hgComponents4, hosts4); + Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>(); hostGroups.add(group1); hostGroups.add(group2); hostGroups.add(group3); + hostGroups.add(group4); ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups); BlueprintConfigurationProcessor updater = new BlueprintConfigurationProcessor(topology); @@ -2770,6 +2867,29 @@ public class BlueprintConfigurationProcessorTest { assertTrue(expectedHosts.contains(host)); expectedHosts.remove(host); } + + + String updatedNimbusSeedsVal = topology.getConfiguration().getFullProperties().get("storm-site").get("nimbus.seeds"); + assertTrue("Updated YAML value should start with bracket", updatedNimbusSeedsVal.startsWith("[")); + assertTrue("Updated YAML value should end with bracket", updatedNimbusSeedsVal.endsWith("]")); + // remove the surrounding brackets + updatedNimbusSeedsVal = updatedNimbusSeedsVal.replaceAll("[\\[\\]]", ""); + + String[] nimbusHosts = updatedNimbusSeedsVal.split(","); + + Collection<String> expectedNimbusHosts = new HashSet<String>(); + expectedNimbusHosts.add("testhost"); + expectedNimbusHosts.add("testhost4"); + + assertEquals("Incorrect number of hosts found in updated Nimbus config property", 2, nimbusHosts.length); + for (String host : nimbusHosts) { + assertTrue("Expected Nimbus host = " + host + " not found in nimbus.seeds property value", expectedNimbusHosts.contains(host)); + expectedHosts.remove(host); + } + + + + } @Test