This is an automated email from the ASF dual-hosted git repository. benyoka pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new 799e487 AMBARI-24964 stack advisor layout recommendation for add service request (benyoka) (#2662) 799e487 is described below commit 799e487c960a567bd541322f5d544e3548eab0c9 Author: benyoka <beny...@users.noreply.github.com> AuthorDate: Thu Nov 29 13:18:41 2018 +0100 AMBARI-24964 stack advisor layout recommendation for add service request (benyoka) (#2662) * AMBARI-24964 stack advisor layout recommendation for add service request (benyoka) * AMBARI-24964 fix review comments (benyoka) * AMBARI-24964 fix checkstyle (benyoka) --- .../services/stackadvisor/StackAdvisorRequest.java | 5 + .../recommendations/RecommendationResponse.java | 61 +++++ .../validations/ValidationResponse.java | 15 ++ .../ambari/server/configuration/Configuration.java | 19 ++ .../server/controller/AddServiceRequest.java | 8 + .../ambari/server/controller/internal/Stack.java | 7 +- .../server/topology/addservice/AddServiceInfo.java | 4 + .../addservice/AddServiceOrchestrator.java | 35 ++- .../topology/addservice/AutoHostgroupStrategy.java | 49 ++++ .../addservice/GroupByComponentsStrategy.java | 50 ++++ .../addservice/HostGroupForEachHostStrategy.java | 34 +++ .../topology/addservice/HostGroupStrategy.java | 29 +++ .../topology/addservice/StackAdvisorAdapter.java | 213 ++++++++++++++++ .../RecommendationResponseTest.java | 103 ++++++++ .../BlueprintConfigurationProcessorTest.java | 4 +- .../topology/addservice/HostGroupStrategyTest.java | 79 ++++++ .../addservice/StackAdvisorAdapterTest.java | 275 +++++++++++++++++++++ 17 files changed, 974 insertions(+), 16 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorRequest.java index 98c75f4..10baa33 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorRequest.java @@ -29,6 +29,7 @@ import java.util.Set; import org.apache.ambari.server.api.services.stackadvisor.recommendations.RecommendationResponse; import org.apache.ambari.server.state.ChangedConfigInfo; +import org.apache.ambari.server.state.StackId; import org.apache.commons.lang.StringUtils; import com.google.common.base.Preconditions; @@ -157,6 +158,10 @@ public class StackAdvisorRequest { this.instance = new StackAdvisorRequest(stackName, stackVersion); } + public static StackAdvisorRequestBuilder forStack(StackId stackId) { + return forStack(stackId.getStackName(), stackId.getStackVersion()); + } + public static StackAdvisorRequestBuilder forStack(String stackName, String stackVersion) { return new StackAdvisorRequestBuilder(stackName, stackVersion); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponse.java index d0d8601..99e9ab2 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponse.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponse.java @@ -18,6 +18,11 @@ package org.apache.ambari.server.api.services.stackadvisor.recommendations; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -26,9 +31,13 @@ import java.util.Set; import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorResponse; import org.apache.ambari.server.state.ValueAttributesInfo; +import org.apache.commons.lang3.tuple.Pair; +import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.annotate.JsonSerialize; +import com.google.common.collect.ImmutableMap; + /** * Recommendation response POJO. */ @@ -125,6 +134,12 @@ public class RecommendationResponse extends StackAdvisorResponse { public void setHostGroups(Set<HostGroup> hostGroups) { this.hostGroups = hostGroups; } + + public Map<String, Set<String>> getHostgroupComponentMap() { + return hostGroups.stream() + .flatMap(hg -> hg.getComponentNames().stream().map(comp -> Pair.of(hg.getName(), comp))) + .collect(groupingBy(Pair::getKey, mapping(Pair::getValue, toSet()))); + } } public static class BlueprintConfigurations { @@ -202,6 +217,25 @@ public class RecommendationResponse extends StackAdvisorResponse { public void setComponents(Set<Map<String, String>> components) { this.components = components; } + + @JsonIgnore + public Set<String> getComponentNames() { + return components.stream().map(comp -> comp.get("name")).collect(toSet()); + } + + public static Set<HostGroup> fromHostGroupComponents(Map<String, Set<String>> hostGroupComponents) { + return hostGroupComponents.entrySet().stream() + .map(entry -> create(entry.getKey(), entry.getValue())) + .collect(toSet()); + } + + public static HostGroup create(String name, Set<String> componentNames) { + HostGroup group = new HostGroup(); + group.setName(name); + Set<Map<String, String>> components = componentNames.stream().map(comp -> ImmutableMap.of("name", comp)).collect(toSet()); + group.setComponents(components); + return group; + } } public static class BlueprintClusterBinding { @@ -215,6 +249,20 @@ public class RecommendationResponse extends StackAdvisorResponse { public void setHostGroups(Set<BindingHostGroup> hostGroups) { this.hostGroups = hostGroups; } + + @JsonIgnore + public Map<String, Set<String>> getHostgroupHostMap() { + return hostGroups.stream().collect(toMap(BindingHostGroup::getName, BindingHostGroup::getHostNames)); + } + + public static BlueprintClusterBinding fromHostGroupHostMap(Map<String, Set<String>> hostGroupHosts) { + Set<BindingHostGroup> hostGroups = hostGroupHosts.entrySet().stream() + .map(entry -> BindingHostGroup.create(entry.getKey(), entry.getValue())) + .collect(toSet()); + BlueprintClusterBinding binding = new BlueprintClusterBinding(); + binding.setHostGroups(hostGroups); + return binding; + } } public static class BindingHostGroup { @@ -239,6 +287,19 @@ public class RecommendationResponse extends StackAdvisorResponse { public void setHosts(Set<Map<String, String>> hosts) { this.hosts = hosts; } + + @JsonIgnore + public Set<String> getHostNames() { + return hosts.stream().map(host -> host.get("fqdn")).collect(toSet()); + } + + public static BindingHostGroup create(String name, Set<String> hostNames) { + BindingHostGroup hostGroup = new BindingHostGroup(); + hostGroup.setName(name); + Set<Map<String, String>> hosts = hostNames.stream().map(hostName -> ImmutableMap.of("fqdn", hostName)).collect(toSet()); + hostGroup.setHosts(hosts); + return hostGroup; + } } public static class ConfigGroup { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/validations/ValidationResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/validations/ValidationResponse.java index f51428a..faf7ced 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/validations/ValidationResponse.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/validations/ValidationResponse.java @@ -23,6 +23,8 @@ import java.util.Set; import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorResponse; import org.codehaus.jackson.annotate.JsonProperty; +import com.google.common.base.MoreObjects; + /** * Validation response POJO. */ @@ -116,6 +118,19 @@ public class ValidationResponse extends StackAdvisorResponse { public void setConfigName(String configName) { this.configName = configName; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("type", type) + .add("level", level) + .add("message", message) + .add("componentName", componentName) + .add("host", host) + .add("configType", configType) + .add("configName", configName) + .toString(); + } } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index 18ba439..a4af46c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -68,6 +68,8 @@ import org.apache.ambari.server.security.encryption.CredentialProvider; import org.apache.ambari.server.state.services.MetricsRetrievalService; import org.apache.ambari.server.state.services.RetryUpgradeActionService; import org.apache.ambari.server.state.stack.OsFamily; +import org.apache.ambari.server.topology.addservice.GroupByComponentsStrategy; +import org.apache.ambari.server.topology.addservice.HostGroupStrategy; import org.apache.ambari.server.upgrade.AbstractUpgradeCatalog; import org.apache.ambari.server.utils.AmbariPath; import org.apache.ambari.server.utils.DateUtils; @@ -2603,6 +2605,14 @@ public class Configuration { public static final ConfigurationProperty<Integer> DEFAULT_MAX_DEGREE_OF_PARALLELISM_FOR_UPGRADES = new ConfigurationProperty<>( "stack.upgrade.default.parallelism", 100); + /** + * Fully qualified class name of the strategy used to form host groups for add service request layout recommendation. + */ + @Markdown(description = "Fully qualified class name of the strategy used to form host groups for add service request layout recommendation.") + public static final ConfigurationProperty<String> ADD_SERVICE_HOST_GROUP_STRATEGY = new ConfigurationProperty<>( + "addservice.hostgroup.strategy", GroupByComponentsStrategy.class.getName()); + + private static final Logger LOG = LoggerFactory.getLogger( Configuration.class); @@ -5537,6 +5547,15 @@ public class Configuration { } /** + * @return The class of the host group strategy for add service requests. + * @throws ClassNotFoundException if the specified class is not found + * @throws ClassCastException if the specified class is not a subclass of {@link HostGroupStrategy} + */ + public Class<? extends HostGroupStrategy> getAddServiceHostGroupStrategyClass() throws ClassNotFoundException { + return Class.forName(getProperty(ADD_SERVICE_HOST_GROUP_STRATEGY)).asSubclass(HostGroupStrategy.class); + } + + /** * Generates a markdown table which includes: * <ul> * <li>Property key name</li> diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java index f3217d1..9c0ccb5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java @@ -40,6 +40,7 @@ import java.util.function.Function; import org.apache.ambari.annotations.ApiIgnore; import org.apache.ambari.server.controller.internal.ProvisionAction; +import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.topology.ConfigRecommendationStrategy; import org.apache.ambari.server.topology.ConfigurableHelper; import org.apache.ambari.server.topology.Configuration; @@ -172,6 +173,13 @@ public final class AddServiceRequest { return stackVersion; } + @JsonIgnore + @ApiIgnore + public Optional<StackId> getStackId() { + return null != stackName && null != stackVersion + ? Optional.of(new StackId(stackName, stackVersion)) : Optional.empty(); + } + @JsonProperty(SERVICES) @ApiModelProperty(name = SERVICES) public Set<Service> getServices() { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java index 02e8b4e..b4fec0c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java @@ -206,6 +206,9 @@ public class Stack { return version; } + public StackId getStackId() { + return new StackId(name, version); + } Map<DependencyInfo, String> getDependencyConditionalServiceMap() { return dependencyConditionalServiceMap; @@ -214,9 +217,9 @@ public class Stack { /** * Get services contained in the stack. * - * @return collection of all services for the stack + * @return set of all services for the stack */ - public Collection<String> getServices() { + public Set<String> getServices() { return serviceComponents.keySet(); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java index 15c3b1f..f08ad65 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java @@ -48,6 +48,10 @@ public final class AddServiceInfo { this.config = config; } + public AddServiceInfo withNewServices(Map<String, Map<String, Set<String>>> services) { + return new AddServiceInfo(request, clusterName, stack, config, stages, services); + } + @Override public String toString() { return "AddServiceRequest(" + stages.getId() + ")"; diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java index a669b94..f4bd08a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java @@ -72,6 +72,9 @@ public class AddServiceOrchestrator { @Inject private ConfigHelper configHelper; + @Inject + private StackAdvisorAdapter stackAdvisorAdapter; + public RequestStatusResponse processAddServiceRequest(Cluster cluster, AddServiceRequest request) { LOG.info("Received {} request for {}: {}", request.getOperationType(), cluster.getClusterName(), request); @@ -108,18 +111,19 @@ public class AddServiceOrchestrator { try { stack = new Stack(stackId, controller); Set<String> existingServices = cluster.getServices().keySet(); + // process service declarations + for (AddServiceRequest.Service service : request.getServices()) { + checkAndLog(!stack.getServices().contains(service.getName()), + "Unknown service %s in stack %s", service, stack.getStackId()); + newServices.computeIfAbsent(service.getName(), __ -> new HashMap<>()); + } + // process component declarations for (AddServiceRequest.Component requestedComponent : request.getComponents()) { String serviceName = stack.getServiceForComponent(requestedComponent.getName()); - if (serviceName == null) { - String msg = String.format("No service found for component %s in stack %s", requestedComponent.getName(), stackId); - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - if (existingServices.contains(serviceName)) { - String msg = String.format("Service %s already exists in cluster %s", serviceName, cluster.getClusterName()); - LOG.error(msg); - throw new IllegalArgumentException(msg); - } + checkAndLog( serviceName == null, + "No service found for component %s in stack %s", requestedComponent.getName(), stackId); + checkAndLog( existingServices.contains(serviceName), + "Service %s already exists in cluster %s", serviceName, cluster.getClusterName()); newServices.computeIfAbsent(serviceName, __ -> new HashMap<>()) .computeIfAbsent(requestedComponent.getName(), __ -> new HashSet<>()) @@ -145,6 +149,14 @@ public class AddServiceOrchestrator { return validatedRequest; } + private static void checkAndLog(boolean errorCondition, String errorMessage, Object... messageParams) { + if (errorCondition) { + String msg = String.format(errorMessage, messageParams); + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + } + /** * Requests layout recommendation from the stack advisor. * @return new request, updated based on the recommended layout @@ -152,8 +164,7 @@ public class AddServiceOrchestrator { */ private AddServiceInfo recommendLayout(AddServiceInfo request) { LOG.info("Recommending layout for {}", request); - // TODO implement - return request; + return stackAdvisorAdapter.recommendLayout(request); } /** diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AutoHostgroupStrategy.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AutoHostgroupStrategy.java new file mode 100644 index 0000000..4eaaa90 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AutoHostgroupStrategy.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology.addservice; + +import java.util.Map; +import java.util.Set; + +/** + * A strategy that chooses another strategy based on cluster size. + */ +public class AutoHostgroupStrategy implements HostGroupStrategy { + + private final int largeClusterThreshold; + + public AutoHostgroupStrategy() { + this(10); + } + + public AutoHostgroupStrategy(int largeClusterThreshold) { + this.largeClusterThreshold = largeClusterThreshold; + } + + @Override + public Map<String, Set<String>> calculateHostGroups(Map<String, Set<String>> hostComponentMap) { + HostGroupStrategy strategy = + hostComponentMap.size() <= largeClusterThreshold ? new HostGroupForEachHostStrategy() : new GroupByComponentsStrategy(); + return strategy.calculateHostGroups(hostComponentMap); + } + + public int getLargeClusterThreshold() { + return largeClusterThreshold; + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/GroupByComponentsStrategy.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/GroupByComponentsStrategy.java new file mode 100644 index 0000000..1f5d6bc --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/GroupByComponentsStrategy.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology.addservice; + +import static com.google.common.collect.Lists.newArrayList; +import static java.util.Comparator.comparing; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toCollection; +import static java.util.stream.Collectors.toMap; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.IntStream; + + +public class GroupByComponentsStrategy implements HostGroupStrategy { + @Override + public Map<String, Set<String>> calculateHostGroups(Map<String, Set<String>> hostComponentMap) { + // create components -> hosts map, the values will be the host groups + List<Set<String>> hostGroups = newArrayList( + hostComponentMap.entrySet().stream().collect( + groupingBy(Map.Entry::getValue, mapping(Map.Entry::getKey, toCollection(TreeSet::new))) ).values()); // hosts names are sorted in the host group + hostGroups.sort(comparing(hosts -> hosts.iterator().next())); // alphabetical order by the first hostname in the group to have consistent outcome + + // give a name to each host group and add to a map + Map<String, Set<String>> hostgroupMap = IntStream.range(0, hostGroups.size()).boxed() + .collect(toMap(i -> "host_group_" + (i + 1), i -> hostGroups.get(i))); + + return hostgroupMap; + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupForEachHostStrategy.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupForEachHostStrategy.java new file mode 100644 index 0000000..1cba73b --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupForEachHostStrategy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology.addservice; + +import static java.util.stream.Collectors.toMap; + +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +public class HostGroupForEachHostStrategy implements HostGroupStrategy { + @Override + public Map<String, Set<String>> calculateHostGroups(Map<String, Set<String>> hostComponentMap) { + return hostComponentMap.keySet().stream() + .collect(toMap(host -> "host_group_" + host, host -> ImmutableSet.of(host))); + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupStrategy.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupStrategy.java new file mode 100644 index 0000000..b541f88 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupStrategy.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology.addservice; + +import java.util.Map; +import java.util.Set; + +/** + * Base interface for host group creation strategies + */ +public interface HostGroupStrategy { + Map<String, Set<String>> calculateHostGroups(Map<String, Set<String>> hostComponentMap); +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapter.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapter.java new file mode 100644 index 0000000..45f0e59 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapter.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology.addservice; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.Maps.transformValues; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import javax.inject.Inject; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorException; +import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorHelper; +import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorRequest; +import org.apache.ambari.server.api.services.stackadvisor.recommendations.RecommendationResponse; +import org.apache.ambari.server.api.services.stackadvisor.validations.ValidationResponse; +import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.StackId; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import com.google.inject.ConfigurationException; +import com.google.inject.Injector; +import com.google.inject.ProvisionException; + +public class StackAdvisorAdapter { + + @Inject + private AmbariManagementController managementController; + + @Inject + private StackAdvisorHelper stackAdvisorHelper; + + @Inject + private Configuration serverConfig; + + @Inject + private Injector injector; + + private static final Logger LOG = LoggerFactory.getLogger(StackAdvisorHelper.class); + + /** + * Recommends component layout for the new services to add. If the request contains explicit layout for some components + * this will be added to the stack advisor input. + * @param info + * @return + */ + AddServiceInfo recommendLayout(AddServiceInfo info) { + try { + Cluster cluster = managementController.getClusters().getCluster(info.clusterName()); + Map<String, Map<String, Set<String>>> clusterServices = transformValues( + cluster.getServices(), + service -> transformValues(service.getServiceComponents(), component -> component.getServiceComponentsHosts())); + + // Requested component layout will be added to the StackAdvisor input in addition to existing + // component layout. + Map<String, Map<String, Set<String>>> allServices = mergeDisjunctMaps(clusterServices, info.newServices()); + + Map<String, Set<String>> componentsToHosts = getComponentHostMap(allServices); + + Map<String, Set<String>> hostsToComponents = getHostComponentMap(componentsToHosts); + List<String> hosts = ImmutableList.copyOf(cluster.getHostNames()); + hosts.forEach( host -> hostsToComponents.putIfAbsent(host, new HashSet<>())); // just in case there are hosts that have no components + + Map<String, Set<String>> hostGroups = getHostGroupStrategy().calculateHostGroups(hostsToComponents); + + StackAdvisorRequest request = StackAdvisorRequest.StackAdvisorRequestBuilder + .forStack(info.getStack().getStackId()) + .ofType(StackAdvisorRequest.StackAdvisorRequestType.HOST_GROUPS) + .forHosts(hosts) + .forServices(allServices.keySet()) + .forHostComponents(hostsToComponents) + .forHostsGroupBindings(hostGroups) + .withComponentHostsMap(componentsToHosts) + .withGPLLicenseAccepted(serverConfig.getGplLicenseAccepted()) + .build(); + RecommendationResponse response = stackAdvisorHelper.recommend(request); + + Map<String, Map<String, Set<String>>> recommendedLayout = getRecommendedLayout( + response.getRecommendations().getBlueprintClusterBinding().getHostgroupHostMap(), + response.getRecommendations().getBlueprint().getHostgroupComponentMap(), + info.getStack()::getServiceForComponent); + + Set<ValidationResponse.ValidationItem> validationItems = validateRecommendedLayout(info.getStack().getStackId(), + recommendedLayout, + response.getRecommendations().getBlueprintClusterBinding().getHostgroupHostMap()); + if (!validationItems.isEmpty()) { + LOG.warn("Issues found during recommended topology validation:\n{}", Joiner.on('\n').join(validationItems)); + } + + // Keep the recommendations for new services only + keepNewServicesOnly(recommendedLayout, info.newServices()); + + return info.withNewServices(recommendedLayout); + } + catch (AmbariException|StackAdvisorException ex) { + throw new IllegalArgumentException("Layout recommendation failed.", ex); + } + } + + Set<ValidationResponse.ValidationItem> validateRecommendedLayout(StackId stackId, + Map<String,Map<String,Set<String>>> recommendedLayout, + Map<String, Set<String>> recommendedHostgroups) throws StackAdvisorException { + Map<String, Set<String>> componentsToHosts = getComponentHostMap(recommendedLayout); + Map<String, Set<String>> hostsToComponents = getHostComponentMap(componentsToHosts); + List<String> hosts = ImmutableList.copyOf(hostsToComponents.keySet()); + + StackAdvisorRequest request = StackAdvisorRequest.StackAdvisorRequestBuilder + .forStack(stackId) + .ofType(StackAdvisorRequest.StackAdvisorRequestType.HOST_GROUPS) + .forHosts(hosts) + .forServices(recommendedLayout.keySet()) + .forHostComponents(hostsToComponents) + .forHostsGroupBindings(recommendedHostgroups) + .withComponentHostsMap(componentsToHosts) + .withGPLLicenseAccepted(serverConfig.getGplLicenseAccepted()) + .build(); + ValidationResponse response = stackAdvisorHelper.validate(request); + + return response.getItems(); + } + + static void keepNewServicesOnly(Map<String,Map<String,Set<String>>> recommendedLayout, Map<String,Map<String,Set<String>>> newServices) { + recommendedLayout.keySet().retainAll(newServices.keySet()); + } + + static Map<String, Map<String, Set<String>>> getRecommendedLayout(Map<String, Set<String>> hostGroupHosts, + Map<String, Set<String>> hostGroupComponents, + Function<String, String> componentToService) { + Map<String, Set<String>> componentHostMap = hostGroupComponents.entrySet().stream() + .flatMap(entry -> entry.getValue().stream().map(comp -> Pair.of(comp, entry.getKey()))) // component -> hostgroup + .flatMap(cmpHg -> hostGroupHosts.get(cmpHg.getValue()).stream().map(host -> Pair.of(cmpHg.getKey(), host))) // component -> host + .collect(groupingBy(Pair::getKey, mapping(Pair::getValue, toSet())));// group by component + + return componentHostMap.entrySet().stream().collect( + groupingBy( + cmpHost -> componentToService.apply(cmpHost.getKey()), + toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + + /** + * Transform a map of component -> hosts to a map of hosts -> components + * @param componentHostMap the map to transform + * @return the transformed map + */ + static Map<String, Set<String>> getHostComponentMap(Map<String, Set<String>> componentHostMap) { + return componentHostMap.entrySet().stream() + .flatMap(compHosts -> compHosts.getValue().stream().map(host -> Pair.of(host, compHosts.getKey()))) + .collect(groupingBy(Pair::getKey, mapping(Pair::getValue, toSet()))); + } + + /** + * Extracts a [component -> hosts] map from the [service -> component -> hosts] map. Services + * with empty component map will be ignored + * @param serviceComponentHostMap the input map + * @return the extracted map + */ + static Map<String, Set<String>> getComponentHostMap(Map<String, Map<String, Set<String>>> serviceComponentHostMap) { + return serviceComponentHostMap.values().stream() + .reduce(StackAdvisorAdapter::mergeDisjunctMaps) + .orElse(new HashMap<>()); + } + + static <S, T> Map<S, T> mergeDisjunctMaps(Map<? extends S, ? extends T> map1, Map<? extends S, ? extends T> map2) { + Sets.SetView<? extends S> commonKeys = Sets.intersection(map1.keySet(), map2.keySet()); + checkArgument(commonKeys.isEmpty(), "Maps must be disjunct. Common keys: %s", commonKeys); + Map<S, T> merged = new HashMap<>(map1); + merged.putAll(map2); + return merged; + } + + HostGroupStrategy getHostGroupStrategy() { + try { + return injector.getInstance(serverConfig.getAddServiceHostGroupStrategyClass()); + } + catch (ClassNotFoundException | ClassCastException |ConfigurationException | ProvisionException ex) { + throw new IllegalStateException("Cannot load host group strategy", ex); + } + } + +} diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponseTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponseTest.java new file mode 100644 index 0000000..80e25e5 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponseTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.api.services.stackadvisor.recommendations; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Map; +import java.util.Set; + +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class RecommendationResponseTest { + + private final RecommendationResponse response = new RecommendationResponse(); + + @Before + public void setUp() { + RecommendationResponse.Blueprint blueprint = new RecommendationResponse.Blueprint(); + blueprint.setHostGroups(ImmutableSet.of( + hostGroup("host_group_1", "NAMENODE", "ZOOKEEPER_SERVER"), + hostGroup("host_group_2", "DATANODE", "HDFS_CLIENT", "ZOOKEEPER_CLIENT") + )); + + RecommendationResponse.BlueprintClusterBinding clusterBinding = new RecommendationResponse.BlueprintClusterBinding(); + clusterBinding.setHostGroups(ImmutableSet.of( + hostGroupBinding("host_group_1", "c7401", "c7402"), + hostGroupBinding("host_group_2", "c7403", "c7404", "c7405") + )); + + RecommendationResponse.Recommendation recommendation = new RecommendationResponse.Recommendation(); + recommendation.setBlueprint(blueprint); + recommendation.setBlueprintClusterBinding(clusterBinding); + + response.setRecommendations(recommendation); + } + + @Test + public void blueprint_getHostgroupComponentMap() { + ImmutableMap<String, Set<String>> expected = ImmutableMap.of( + "host_group_1", ImmutableSet.of("NAMENODE", "ZOOKEEPER_SERVER"), + "host_group_2", ImmutableSet.of("DATANODE", "HDFS_CLIENT", "ZOOKEEPER_CLIENT")); + assertEquals(expected, response.getRecommendations().getBlueprint().getHostgroupComponentMap()); + } + + @Test + public void hostgGroup_getComponentNames() { + Map<String, RecommendationResponse.HostGroup> hostGroups = + response.getRecommendations().getBlueprint().getHostGroups().stream() + .collect(toMap(RecommendationResponse.HostGroup::getName, identity())); + assertEquals(ImmutableSet.of("NAMENODE", "ZOOKEEPER_SERVER"), hostGroups.get("host_group_1").getComponentNames()); + assertEquals(ImmutableSet.of("DATANODE", "HDFS_CLIENT", "ZOOKEEPER_CLIENT"), hostGroups.get("host_group_2").getComponentNames()); + } + + @Test + public void blueprintClusterBinding_getHostgroupHostMap() { + ImmutableMap<String, Set<String>> expected = ImmutableMap.of( + "host_group_1", ImmutableSet.of("c7401", "c7402"), + "host_group_2", ImmutableSet.of("c7403", "c7404", "c7405")); + assertEquals(expected, response.getRecommendations().getBlueprintClusterBinding().getHostgroupHostMap()); + } + + private static final RecommendationResponse.HostGroup hostGroup(String name, String... components) { + RecommendationResponse.HostGroup hostGroup = new RecommendationResponse.HostGroup(); + hostGroup.setName(name); + Set<Map<String, String>> hostGroupComponents = + Arrays.stream(components).map(comp -> ImmutableMap.of("name", comp)).collect(toSet()); + hostGroup.setComponents(hostGroupComponents); + return hostGroup; + } + + private static final RecommendationResponse.BindingHostGroup hostGroupBinding(String name, String... hosts) { + RecommendationResponse.BindingHostGroup hostGroup = new RecommendationResponse.BindingHostGroup(); + hostGroup.setName(name); + Set<Map<String, String>> hostGroupHosts = + Arrays.stream(hosts).map(host -> ImmutableMap.of("fqdn", host)).collect(toSet()); + hostGroup.setHosts(hostGroupHosts); + return hostGroup; + } +} \ No newline at end of file diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java index e7cd271..d1a22f0 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java @@ -10336,7 +10336,7 @@ public class BlueprintConfigurationProcessorTest extends EasyMockSupport { ClusterTopology topology = createNiceMock(ClusterTopology.class); Stack stack = createNiceMock(Stack.class); - Collection<String> services = ImmutableList.of("HDFS"); + Set<String> services = ImmutableSet.of("HDFS"); expect(stack.getServices()).andReturn(services).anyTimes(); expect(stack.getConfiguration()).andReturn(stackConfig).anyTimes(); expect(topology.getConfiguration()).andReturn(clusterConfig).anyTimes(); @@ -10360,7 +10360,7 @@ public class BlueprintConfigurationProcessorTest extends EasyMockSupport { ClusterTopology topology = createNiceMock(ClusterTopology.class); Stack stack = createNiceMock(Stack.class); - Collection<String> services = ImmutableList.of("HDFS"); + Set<String> services = ImmutableSet.of("HDFS"); expect(stack.getServices()).andReturn(services).anyTimes(); expect(stack.getConfiguration()).andReturn(stackConfig).anyTimes(); expect(topology.getConfiguration()).andReturn(clusterConfig).anyTimes(); diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/HostGroupStrategyTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/HostGroupStrategyTest.java new file mode 100644 index 0000000..2732ffa --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/HostGroupStrategyTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology.addservice; + + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.Set; + +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class HostGroupStrategyTest { + + public static final Map<String, Set<String>> HOST_COMPONENTS = ImmutableMap.<String, Set<String>>builder() + .put("c7401", ImmutableSet.of("ZOOKEEPER_SERVER, NAMENODE, HDFS_CLIENT")) + .put("c7402", ImmutableSet.of("ZOOKEEPER_SERVER, NAMENODE, HDFS_CLIENT")) + .put("c7403", ImmutableSet.of("ZOOKEEPER_SERVER, NAMENODE, HDFS_CLIENT")) + .put("c7404", ImmutableSet.of("ZOOKEEPER_SERVER, NAMENODE, HDFS_CLIENT, SECONDARY_NAMENODE")) + .put("c7405", ImmutableSet.of("HIVE_SERVER, KAFKA_BROKER, ZOOKEEPER_CLIENT")) + .put("c7406", ImmutableSet.of("DATANODE, HDFS_CLIENT, ZOOKEEPER_CLIENT")) + .put("c7407", ImmutableSet.of("DATANODE, HDFS_CLIENT, ZOOKEEPER_CLIENT")) + .put("c7408", ImmutableSet.of("DATANODE, HDFS_CLIENT, ZOOKEEPER_CLIENT")) + .build(); + + Map<String, Set<String>> HOST_GROUPS_FOR_EACH_HOST = ImmutableMap.<String, Set<String>>builder() + .put("host_group_c7401", ImmutableSet.of("c7401")) + .put("host_group_c7402", ImmutableSet.of("c7402")) + .put("host_group_c7403", ImmutableSet.of("c7403")) + .put("host_group_c7404", ImmutableSet.of("c7404")) + .put("host_group_c7405", ImmutableSet.of("c7405")) + .put("host_group_c7406", ImmutableSet.of("c7406")) + .put("host_group_c7407", ImmutableSet.of("c7407")) + .put("host_group_c7408", ImmutableSet.of("c7408")) + .build(); + + Map<String, Set<String>> HOST_GROUPS_BY_COMPONENTS = ImmutableMap.of( + "host_group_1", ImmutableSet.of("c7401", "c7402", "c7403"), + "host_group_2", ImmutableSet.of("c7404"), + "host_group_3", ImmutableSet.of("c7405"), + "host_group_4", ImmutableSet.of("c7406", "c7407", "c7408") + ); + + + @Test + public void hostGroupForEachHostStrategy() { + assertEquals(HOST_GROUPS_FOR_EACH_HOST, new HostGroupForEachHostStrategy().calculateHostGroups(HOST_COMPONENTS)); + } + + @Test + public void groupByComponentsStrategy() { + assertEquals(HOST_GROUPS_BY_COMPONENTS, new GroupByComponentsStrategy().calculateHostGroups(HOST_COMPONENTS)); + } + + @Test + public void autoHostgroupStrategy() { + assertEquals(HOST_GROUPS_FOR_EACH_HOST, new AutoHostgroupStrategy().calculateHostGroups(HOST_COMPONENTS)); + assertEquals(HOST_GROUPS_BY_COMPONENTS, new AutoHostgroupStrategy(7).calculateHostGroups(HOST_COMPONENTS)); + } +} \ No newline at end of file diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapterTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapterTest.java new file mode 100644 index 0000000..e652f78 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapterTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology.addservice; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.stream.Collectors.toMap; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.getCurrentArguments; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; +import static org.junit.Assert.assertEquals; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorHelper; +import org.apache.ambari.server.api.services.stackadvisor.recommendations.RecommendationResponse; +import org.apache.ambari.server.api.services.stackadvisor.validations.ValidationResponse; +import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.internal.Stack; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.ServiceComponent; +import org.apache.ambari.server.state.StackId; +import org.apache.commons.lang3.tuple.Pair; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.TestSubject; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Injector; + +@RunWith(EasyMockRunner.class) +public class StackAdvisorAdapterTest { + + @Mock + private AmbariManagementController managementController; + + @Mock + private StackAdvisorHelper stackAdvisorHelper; + + @Mock + private Configuration serverConfig; + + @Mock + private Injector injector; + + @Mock + private Stack stack; + + @TestSubject + private StackAdvisorAdapter adapter = new StackAdvisorAdapter(); + + private static final Map<String, Set<String>> COMPONENT_HOST_MAP = ImmutableMap.<String, Set<String>>builder() + .put("NAMENODE", ImmutableSet.of("c7401", "c7402")) + .put("DATANODE", ImmutableSet.of("c7403", "c7404", "c7405", "c7406")) + .put("HDFS_CLIENT", ImmutableSet.of("c7403", "c7404", "c7405", "c7406")) + .put("ZOOKEEPER_SERVER", ImmutableSet.of("c7401", "c7402")) + .put("ZOOKEEPER_CLIENT", ImmutableSet.of("c7401", "c7402", "c7403", "c7404", "c7405", "c7406")) + .build(); + + private static final Map<String, Map<String, Set<String>>> SERVICE_COMPONENT_HOST_MAP_1 = ImmutableMap.of( + "HDFS", ImmutableMap.of( + "NAMENODE", ImmutableSet.of("c7401", "c7402"), + "DATANODE", ImmutableSet.of("c7403", "c7404", "c7405", "c7406"), + "HDFS_CLIENT", ImmutableSet.of("c7403", "c7404", "c7405", "c7406")), + "ZOOKEEPER", ImmutableMap.of( + "ZOOKEEPER_SERVER", ImmutableSet.of("c7401", "c7402"), + "ZOOKEEPER_CLIENT", ImmutableSet.of("c7401", "c7402", "c7403", "c7404", "c7405", "c7406"))); + + private static final Map<String, Map<String, Set<String>>> SERVICE_COMPONENT_HOST_MAP_2 = ImmutableMap.<String, Map<String, Set<String>>>builder() + .putAll(SERVICE_COMPONENT_HOST_MAP_1) + .put("HIVE", emptyMap()) + .put("SPARK2", emptyMap()) + .build(); + + private static final Map<String, Set<String>> HOST_COMPONENT_MAP = ImmutableMap.<String, Set<String>>builder() + .put("c7401", ImmutableSet.of("NAMENODE", "ZOOKEEPER_SERVER", "ZOOKEEPER_CLIENT")) + .put("c7402", ImmutableSet.of("NAMENODE", "ZOOKEEPER_SERVER", "ZOOKEEPER_CLIENT")) + .put("c7403", ImmutableSet.of("DATANODE", "HDFS_CLIENT", "ZOOKEEPER_CLIENT")) + .put("c7404", ImmutableSet.of("DATANODE", "HDFS_CLIENT", "ZOOKEEPER_CLIENT")) + .put("c7405", ImmutableSet.of("DATANODE", "HDFS_CLIENT", "ZOOKEEPER_CLIENT")) + .put("c7406", ImmutableSet.of("DATANODE", "HDFS_CLIENT", "ZOOKEEPER_CLIENT")) + .build(); + + @Test + public void getHostComponentMap() { + assertEquals(HOST_COMPONENT_MAP, StackAdvisorAdapter.getHostComponentMap(COMPONENT_HOST_MAP)); + } + + @Test + public void getComponentHostMap() { + assertEquals(COMPONENT_HOST_MAP, StackAdvisorAdapter.getComponentHostMap(SERVICE_COMPONENT_HOST_MAP_2)); + } + + @Test + public void getRecommendedLayout() { + Map<String, Set<String>> hostGroups = ImmutableMap.of( + "host_group1", ImmutableSet.of("c7401", "c7402"), + "host_group2", ImmutableSet.of("c7403", "c7404", "c7405", "c7406")); + + Map<String, Set<String>> hostGroupComponents = ImmutableMap.of( + "host_group1", ImmutableSet.of("NAMENODE", "ZOOKEEPER_SERVER", "ZOOKEEPER_CLIENT"), + "host_group2", ImmutableSet.of("DATANODE", "HDFS_CLIENT", "ZOOKEEPER_CLIENT")); + + Map<String, String> serviceToComponent = ImmutableMap.<String, String>builder() + .put("NAMENODE", "HDFS") + .put("DATANODE", "HDFS") + .put("HDFS_CLIENT", "HDFS") + .put("ZOOKEEPER_SERVER", "ZOOKEEPER") + .put("ZOOKEEPER_CLIENT", "ZOOKEEPER") + .build(); + + assertEquals(SERVICE_COMPONENT_HOST_MAP_1, + StackAdvisorAdapter.getRecommendedLayout(hostGroups, hostGroupComponents, serviceToComponent::get)); + } + + @Test + public void mergeDisjunctMaps() { + Map<String, String> map1 = ImmutableMap.of("key1", "value1", "key2", "value2"); + Map<String, String> map2 = ImmutableMap.of("key3", "value3", "key4", "value4"); + assertEquals( + ImmutableMap.of("key1", "value1", "key2", "value2", "key3", "value3", "key4", "value4"), + StackAdvisorAdapter.mergeDisjunctMaps(map1, map2)); + } + + @Test(expected = IllegalArgumentException.class) + public void mergeDisjunctMaps_invalidInput() { + Map<String, String> map1 = ImmutableMap.of("key1", "value1", "key2", "value2"); + Map<String, String> map2 = ImmutableMap.of("key2", "value2", "key3", "value3"); + StackAdvisorAdapter.mergeDisjunctMaps(map1, map2); + } + + @Test + public void keepNewServicesOnly() { + Map<String, Map<String, Set<String>>> newServices = ImmutableMap.of( + "KAFKA", emptyMap(), + "PIG", emptyMap()); + + Map<String, Map<String, Set<String>>> recommendationForNewServices = ImmutableMap.of( + "KAFKA", ImmutableMap.of("KAFKA_BROKER", ImmutableSet.of("c7405")), + "PIG", ImmutableMap.of("PIG_CLIENT", ImmutableSet.of("c7405", "c7406"))); + + Map<String, Map<String, Set<String>>> recommendations = new HashMap<>(SERVICE_COMPONENT_HOST_MAP_1); + recommendations.putAll(recommendationForNewServices); + + StackAdvisorAdapter.keepNewServicesOnly(recommendations, newServices); + assertEquals(recommendationForNewServices, recommendations); + } + + @Before + public void setUp() throws Exception { + Cluster cluster = mock(Cluster.class); + expect(cluster.getHostNames()).andReturn(ImmutableSet.of("c7401", "c7402")); + expect(cluster.getServices()).andReturn(ImmutableMap.of( + "HDFS", + service("HDFS", ImmutableMap.of("NAMENODE", ImmutableSet.of("c7401"), "HDFS_CLIENT", ImmutableSet.of("c7401", "c7402"))), + "ZOOKEEPER", + service("ZOOKEEPER", ImmutableMap.of("ZOOKEEPER_SERVER", ImmutableSet.of("c7401"), "ZOOKEEPER_CLIENT", ImmutableSet.of("c7401", "c7402"))))); + Clusters clusters = mock(Clusters.class); + expect(clusters.getCluster(anyString())).andReturn(cluster).anyTimes(); + expect(managementController.getClusters()).andReturn(clusters).anyTimes(); + replay(clusters, cluster, managementController); + + expect(serverConfig.getGplLicenseAccepted()).andReturn(Boolean.FALSE).anyTimes(); + expect(serverConfig.getAddServiceHostGroupStrategyClass()).andReturn((Class)GroupByComponentsStrategy.class).anyTimes(); + replay(serverConfig); + + expect(injector.getInstance(GroupByComponentsStrategy.class)).andReturn(new GroupByComponentsStrategy()).anyTimes(); + replay(injector); + + RecommendationResponse response = new RecommendationResponse(); + RecommendationResponse.Recommendation recommendation = new RecommendationResponse.Recommendation(); + response.setRecommendations(recommendation); + RecommendationResponse.BlueprintClusterBinding binding = RecommendationResponse.BlueprintClusterBinding.fromHostGroupHostMap( + ImmutableMap.of( + "hostgroup-1", ImmutableSet.of("c7401"), + "hostgroup-2", ImmutableSet.of("c7402"))); + recommendation.setBlueprintClusterBinding(binding); + RecommendationResponse.Blueprint blueprint = new RecommendationResponse.Blueprint(); + blueprint.setHostGroups(RecommendationResponse.HostGroup.fromHostGroupComponents( + ImmutableMap.of( + "hostgroup-1", ImmutableSet.of("NAMENODE", "HDFS_CLIENT", "ZOOKEEPER_SERVER", "ZOOKEEPER_CLIENT"), + "hostgroup-2", ImmutableSet.of("HDFS_CLIENT", "ZOOKEEPER_CLIENT", "KAFKA_BROKER")) + )); + recommendation.setBlueprint(blueprint); + expect(stackAdvisorHelper.recommend(anyObject())).andReturn(response); + + ValidationResponse validationResponse = new ValidationResponse(); + validationResponse.setItems(emptySet()); + expect(stackAdvisorHelper.validate(anyObject())).andReturn(validationResponse); + replay(stackAdvisorHelper); + + expect(stack.getStackId()).andReturn(new StackId("HDP", "3.0")).anyTimes(); + ImmutableMap<String, String> serviceComponentMap = ImmutableMap.<String, String>builder() + .put("KAFKA_BROKER", "KAFKA") + .put("NAMENODE", "HDFS") + .put("HDFS_CLIENT", "HDFS") + .put("ZOOKEEPER_SERVER", "ZOOKEEPER") + .put("ZOOKEEPER_CLIENT", "ZOOKEEPER") + .build(); + expect(stack.getServiceForComponent(anyString())).andAnswer(() -> serviceComponentMap.get(getCurrentArguments()[0])).anyTimes(); + replay(stack); + } + + private static Service service(String name, ImmutableMap<String,ImmutableSet<String>> componentHostMap) { + Service service = mock(Service.class); + expect(service.getName()).andReturn(name).anyTimes(); + Map<String, ServiceComponent> serviceComponents = componentHostMap.entrySet().stream() + .map(entry -> { + ServiceComponent component = mock(ServiceComponent.class); + expect(component.getName()).andReturn(entry.getKey()).anyTimes(); + expect(component.getServiceComponentsHosts()).andReturn(entry.getValue()).anyTimes(); + replay(component); + return Pair.of(entry.getKey(), component); + }) + .collect(toMap(Pair::getKey, Pair::getValue)); + expect(service.getServiceComponents()).andReturn(serviceComponents).anyTimes(); + replay(service); + return service; + } + + @Test + public void recommendLayout() { + Map<String, Map<String, Set<String>>> newServices = ImmutableMap.of( + "KAFKA", + ImmutableMap.of("KAFKA_BROKER", emptySet())); + + AddServiceInfo info = new AddServiceInfo(null, "c1", stack, org.apache.ambari.server.topology.Configuration.newEmpty(), null, newServices); + AddServiceInfo infoWithRecommendations = adapter.recommendLayout(info); + + Map<String, Map<String, Set<String>>> expectedNewLayout = ImmutableMap.of( + "KAFKA", + ImmutableMap.of("KAFKA_BROKER", ImmutableSet.of("c7402")) + ); + + assertEquals(expectedNewLayout, infoWithRecommendations.newServices()); + } + + + private static Map<String, Map<String, Set<String>>> mutableCopy(Map<String, Map<String, Set<String>>> map) { + Map<String, Map<String, Set<String>>> copy = new HashMap<>(); + map.entrySet().forEach( outer -> { + Map<String, Set<String>> innerCopy = new HashMap<>(outer.getValue()); + copy.put(outer.getKey(), innerCopy); + }); + return copy; + } +} \ No newline at end of file