This is an automated email from the ASF dual-hosted git repository. benyoka pushed a commit to branch branch-feature-AMBARI-14714 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-feature-AMBARI-14714 by this push: new 1edb62b [AMBARI-24464] Integrate Blueprints with the new MPackAdvisor API for Configuration Recommendations (#2259) 1edb62b is described below commit 1edb62ba06373f13895d7ba7db0a0d953daa0979 Author: benyoka <beny...@users.noreply.github.com> AuthorDate: Fri Sep 14 13:01:35 2018 +0200 [AMBARI-24464] Integrate Blueprints with the new MPackAdvisor API for Configuration Recommendations (#2259) * AMBARI-24464 blueprint mpack advisor WIP (benyoka) * AMBARI-24464 blueprint mpack advisor WIP #2 (benyoka) * AMBARI-24464 copy config to service instances * AMBARI-24464 copy config to service instances reverted (benyoka) * AMBARI-24464 fix review comments + unit test (benyoka) * AMBARI-24464 fix some more review comments (benyoka) * AMBARI-24464 fix some more review comments #2 (benyoka) * AMBARI-24464 fix some more review comments #3 (benyoka) * AMBARI-24464 fix failing unit tests (benyoka) * AMBARI-24464 fix failing unit tests #2 (benyoka) --- .../api/services/AdvisorBlueprintProcessor.java | 50 +++ .../MpackAdvisorBlueprintProcessor.java | 298 +++++++++++++ .../services/mpackadvisor/MpackAdvisorRequest.java | 23 +- .../services/mpackadvisor/MpackAdvisorRunner.java | 2 +- .../mpackadvisor/commands/MpackAdvisorCommand.java | 77 ++-- .../MpackRecommendationResponse.java | 13 + .../StackAdvisorBlueprintProcessor.java | 18 +- .../ambari/server/configuration/Configuration.java | 15 +- .../ambari/server/controller/AmbariServer.java | 2 + .../ambari/server/controller/ControllerModule.java | 6 + .../internal/ConfigurationResourceProvider.java | 3 + .../internal/MpackAdvisorResourceProvider.java | 36 +- .../internal/ProvisionClusterRequest.java | 2 + .../topology/ClusterConfigurationRequest.java | 14 +- .../ambari/server/topology/ClusterTopology.java | 4 + .../server/topology/ClusterTopologyImpl.java | 92 ++-- .../ambari/server/topology/MpackInstance.java | 22 + .../ambari/server/topology/ServiceInstance.java | 7 + .../ambari/server/topology/TopologyManager.java | 8 +- .../apache/ambari/server/utils/ExceptionUtils.java | 84 ++++ .../MpackAdvisorBlueprintProcessorTest.java | 485 +++++++++++++++++++++ .../commands/MpackAdvisorCommandTest.java | 5 +- .../internal/ProvisionClusterRequestTest.java | 12 + 23 files changed, 1138 insertions(+), 140 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AdvisorBlueprintProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AdvisorBlueprintProcessor.java new file mode 100644 index 0000000..52f9ecf --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AdvisorBlueprintProcessor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.api.services; + +import java.util.Map; + +import org.apache.ambari.server.api.services.mpackadvisor.MpackAdvisorBlueprintProcessor; +import org.apache.ambari.server.controller.internal.ConfigurationTopologyException; +import org.apache.ambari.server.topology.ClusterTopology; + +import com.google.inject.ImplementedBy; + +/** + * Common interface for topology/configuration recommendation engines. Currently there is a legacy implementation for + * stack advisor and a new implementation for mpack advisor. + * <p>See: + * {@link org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor} + * {@link org.apache.ambari.server.api.services.mpackadvisor.MpackAdvisorBlueprintProcessor} + * </p> + */ +@ImplementedBy(MpackAdvisorBlueprintProcessor.class) +public interface AdvisorBlueprintProcessor { + + String RECOMMENDATION_FAILED = "Configuration recommendation failed."; + String INVALID_RESPONSE = "Configuration recommendation returned with invalid response."; + + /** + * Recommend configurations by the advisor, then store the results in cluster topology. + * @param clusterTopology cluster topology instance + * @param userProvidedConfigurations User configurations of cluster provided in Blueprint + Cluster template + */ + void adviseConfiguration(ClusterTopology clusterTopology, Map<String, Map<String, String>> userProvidedConfigurations) throws ConfigurationTopologyException; + +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorBlueprintProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorBlueprintProcessor.java new file mode 100644 index 0000000..6c125c2 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorBlueprintProcessor.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.api.services.mpackadvisor; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; +import static org.apache.ambari.server.utils.ExceptionUtils.unchecked; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.ambari.server.api.services.AdvisorBlueprintProcessor; +import org.apache.ambari.server.api.services.AmbariMetaInfo; +import org.apache.ambari.server.api.services.mpackadvisor.recommendations.MpackRecommendationResponse; +import org.apache.ambari.server.controller.internal.ConfigurationTopologyException; +import org.apache.ambari.server.state.ComponentInfo; +import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.ValueAttributesInfo; +import org.apache.ambari.server.topology.AdvisedConfiguration; +import org.apache.ambari.server.topology.ClusterTopology; +import org.apache.ambari.server.topology.Component; +import org.apache.ambari.server.topology.ConfigRecommendationStrategy; +import org.apache.ambari.server.topology.Configuration; +import org.apache.ambari.server.topology.HostGroup; +import org.apache.ambari.server.topology.HostGroupInfo; +import org.apache.ambari.server.topology.MpackInstance; +import org.apache.ambari.server.topology.ResolvedComponent; +import org.apache.ambari.server.topology.ServiceInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.inject.Singleton; + +/** + * Generate advised configurations for blueprint cluster provisioning by the mpack advisor. + */ +@Singleton +public class MpackAdvisorBlueprintProcessor implements AdvisorBlueprintProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(MpackAdvisorBlueprintProcessor.class); + + private static MpackAdvisorHelper mpackAdvisorHelper; + + private static AmbariMetaInfo metaInfo; + + public static void init(MpackAdvisorHelper instance, AmbariMetaInfo ambariMetaInfo) { + mpackAdvisorHelper = instance; + metaInfo = ambariMetaInfo; + } + + private static final Map<String, String> userContext = ImmutableMap.of("operation", "ClusterCreate"); + + /** + * {@inheritDoc} + */ + public void adviseConfiguration(ClusterTopology clusterTopology, Map<String, Map<String, String>> userProvidedConfigurations) throws ConfigurationTopologyException { + MpackAdvisorRequest request = createMpackAdvisorRequest(clusterTopology, MpackAdvisorRequest.MpackAdvisorRequestType.CONFIGURATIONS); + try { + MpackRecommendationResponse response = mpackAdvisorHelper.recommend(request); + addAllAdvisedConfigurationsToTopology(response, clusterTopology, userProvidedConfigurations); + } catch (MpackAdvisorException e) { + throw new ConfigurationTopologyException(RECOMMENDATION_FAILED, e); + } catch (IllegalArgumentException e) { + throw new ConfigurationTopologyException(INVALID_RESPONSE, e); + } + } + + private MpackAdvisorRequest createMpackAdvisorRequest(ClusterTopology clusterTopology, + MpackAdvisorRequest.MpackAdvisorRequestType requestType) { + Map<String, Map<String, Set<String>>> mpackComponentsHostsMap = gatherMackComponentsHostsMap(clusterTopology); + Set<MpackInstance> mpacks = copyAndEnrichMpackInstances(clusterTopology); + Configuration configuration = clusterTopology.getConfiguration(); + return MpackAdvisorRequest.MpackAdvisorRequestBuilder + .forStack() + .forMpackInstances(mpacks) + .forHosts(gatherHosts(clusterTopology)) + .forHostsGroupBindings(gatherHostGroupBindings(clusterTopology)) + .forComponentHostsMap(getHostgroups(clusterTopology)) + .withMpacksToComponentsHostsMap(mpackComponentsHostsMap) + .withConfigurations(calculateConfigs(configuration)) + .withUserContext(userContext) + .ofType(requestType) + .build(); + } + + private Set<MpackInstance> copyAndEnrichMpackInstances(ClusterTopology topology) { + // Copy mpacks + Set<MpackInstance> mpacks = topology.getMpacks().stream().map(MpackInstance::copy).collect(toSet()); + + // Add missing service instances + Map<StackId, Set<String>> mpackServices = topology.getComponents().collect( + groupingBy(ResolvedComponent::stackId, + mapping(comp -> comp.serviceInfo().getName(), toSet()))); + for (MpackInstance mpack: mpacks) { + if (!mpackServices.containsKey(mpack.getStackId())) { + LOG.warn("No services declared for mpack {}.", mpack.getStackId()); + } + else { + Set<String> existingMpackServices = mpack.getServiceInstances().stream().map(ServiceInstance::getType).collect(toSet()); + for(String service: mpackServices.get(mpack.getStackId())) { + if (existingMpackServices.contains(service)) { + LOG.debug("Mpack instance {} already contains service {}", mpack.getStackId(), service); + } + else { + LOG.debug("Adding service {} to mpack instance {}", service, mpack.getStackId()); + mpack.getServiceInstances().add(new ServiceInstance(service, service, null, mpack)); + } + } + } + } + return mpacks; + } + + private Collection<MpackRecommendationResponse.HostGroup> getHostgroups(ClusterTopology topology) { + // TODO: this will need to rewritten for true multi-everything (multiple mpacks of the same type/version under + // different names) + Map<StackId, String> mpackNameByStackId = topology.getMpacks().stream().collect( + toMap( + MpackInstance::getStackId, + MpackInstance::getMpackName + )); + + topology.getComponentsByHostGroup().entrySet().stream().collect( + toMap( + // Map.Entry::getKey, + e -> e.getKey(), + components -> + components.getValue().stream() + .map(comp -> + MpackRecommendationResponse.HostGroup.createComponent(comp.componentName(), + mpackNameByStackId.get(comp.stackId()), + comp.serviceName().orElseGet(() -> comp.serviceType()))) + .collect(toSet()) + ) + ); + + return null; + } + + private Map<String, Set<String>> gatherHostGroupBindings(ClusterTopology clusterTopology) { + Map<String, Set<String>> hgBindings = Maps.newHashMap(); + for (Map.Entry<String, HostGroupInfo> hgEnrty: clusterTopology.getHostGroupInfo().entrySet()) { + hgBindings.put(hgEnrty.getKey(), Sets.newCopyOnWriteArraySet(hgEnrty.getValue().getHostNames())); + } + return hgBindings; + } + + private Map<String, Set<Component>> gatherHostGroupComponents(ClusterTopology clusterTopology) { + Map<String, Set<Component>> hgComponentsMap = Maps.newHashMap(); + for (Map.Entry<String, HostGroup> hgEnrty: clusterTopology.getBlueprint().getHostGroups().entrySet()) { + hgComponentsMap.put(hgEnrty.getKey(), Sets.newCopyOnWriteArraySet(hgEnrty.getValue().getComponents())); + } + return hgComponentsMap; + } + + private Map<String, Map<String, Map<String, String>>> calculateConfigs(Configuration configuration) { + Map<String, Map<String, Map<String, String>>> result = Maps.newHashMap(); + Map<String, Map<String, String>> fullProperties = configuration.getFullProperties(); + for (Map.Entry<String, Map<String, String>> siteEntry : fullProperties.entrySet()) { + Map<String, Map<String, String>> propsMap = Maps.newHashMap(); + propsMap.put("properties", siteEntry.getValue()); + result.put(siteEntry.getKey(), propsMap); + } + return result; + } + + private Map<String, Map<String, Set<String>>> gatherMackComponentsHostsMap(ClusterTopology topology) { + Map<String, Map<String, Set<String>>> mpackComponentsHostsMap = new HashMap<>(); + for (Map.Entry<String, Set<ResolvedComponent>> hgToComps : topology.getComponentsByHostGroup().entrySet()) { + String hostGroup = hgToComps.getKey(); + Set<ResolvedComponent> components = hgToComps.getValue(); + Set<String> hosts = topology.getHostGroupInfo().get(hostGroup).getHostNames(); + for (ResolvedComponent component: components) { + String mpackName = component.stackId().getStackName(); // TODO: support multiple mpacks under different names? + mpackComponentsHostsMap + .computeIfAbsent(mpackName, __ -> new HashMap<>()) + .put(component.componentName(), hosts); + } + } + return mpackComponentsHostsMap; + } + + private Set<String> getMpacksForComponent(Component component, Map<String, Set<String>> componentToMpacks) { + Set<String> mpacksForComponent = component.getMpackInstance() != null + ? ImmutableSet.of(component.getMpackInstance()) + : componentToMpacks.getOrDefault(component.getName(), ImmutableSet.of()); + if (mpacksForComponent.isEmpty()) { + LOG.error("No mpack found for component [{}]", component.getName()); + } + return mpacksForComponent; + } + + private List<String> gatherHosts(ClusterTopology clusterTopology) { + List<String> hosts = Lists.newArrayList(); + for (Map.Entry<String, HostGroupInfo> entry : clusterTopology.getHostGroupInfo().entrySet()) { + hosts.addAll(entry.getValue().getHostNames()); + } + return hosts; + } + + private void addAllAdvisedConfigurationsToTopology(MpackRecommendationResponse response, + ClusterTopology topology, Map<String, Map<String, String>> userProvidedConfigurations) { + Preconditions.checkArgument(response.getRecommendations() != null, + "Recommendation response is empty."); + Preconditions.checkArgument(response.getRecommendations().getBlueprint() != null, + "Blueprint field is missing from the recommendation response."); + + MpackRecommendationResponse.Blueprint blueprint = response.getRecommendations().getBlueprint(); + + addAdvisedConfigurationToTopology(blueprint.getConfigurations(), topology, userProvidedConfigurations); + + blueprint.getMpackInstances().forEach( mpack -> { + mpack.getServiceInstances().forEach( svc -> { + addAdvisedConfigurationToTopology(svc.getConfigurations(), topology, userProvidedConfigurations); + }); + }); + } + + private void addAdvisedConfigurationToTopology(Map<String, MpackRecommendationResponse.BlueprintConfigurations> recommendedConfigurations, + ClusterTopology topology, Map<String, Map<String, String>> userProvidedConfigurations) { + if (null != recommendedConfigurations) { + for (Map.Entry<String, MpackRecommendationResponse.BlueprintConfigurations> configEntry : recommendedConfigurations.entrySet()) { + String configType = configEntry.getKey(); + // add recommended config type only if related service is present in Blueprint + if (topology.isValidConfigType(configType)) { + MpackRecommendationResponse.BlueprintConfigurations blueprintConfig = filterBlueprintConfig(configType, configEntry.getValue(), + userProvidedConfigurations, topology); + topology.getAdvisedConfigurations().put(configType, new AdvisedConfiguration( + blueprintConfig.getProperties(), blueprintConfig.getPropertyAttributes())); + } + } + } + } + + + /** + * Remove user defined properties from Stack Advisor output in case of ONLY_STACK_DEFAULTS_APPLY or + * ALWAYS_APPLY_DONT_OVERRIDE_CUSTOM_VALUES. + */ + private MpackRecommendationResponse.BlueprintConfigurations filterBlueprintConfig(String configType, MpackRecommendationResponse.BlueprintConfigurations config, + Map<String, Map<String, String>> userProvidedConfigurations, + ClusterTopology topology) { + if (topology.getConfigRecommendationStrategy() == ConfigRecommendationStrategy.ONLY_STACK_DEFAULTS_APPLY || + topology.getConfigRecommendationStrategy() == ConfigRecommendationStrategy + .ALWAYS_APPLY_DONT_OVERRIDE_CUSTOM_VALUES) { + if (userProvidedConfigurations.containsKey(configType)) { + MpackRecommendationResponse.BlueprintConfigurations newConfig = new MpackRecommendationResponse.BlueprintConfigurations(); + Map<String, String> filteredProps = Maps.filterKeys(config.getProperties(), + Predicates.not(Predicates.in(userProvidedConfigurations.get(configType).keySet()))); + newConfig.setProperties(Maps.newHashMap(filteredProps)); + + if (config.getPropertyAttributes() != null) { + Map<String, ValueAttributesInfo> filteredAttributes = Maps.filterKeys(config.getPropertyAttributes(), + Predicates.not(Predicates.in(userProvidedConfigurations.get(configType).keySet()))); + newConfig.setPropertyAttributes(Maps.newHashMap(filteredAttributes)); + } + return newConfig; + } + } + return config; + } + + Set<String> getStackComponents(StackId stackId) { + return unchecked(() -> metaInfo.getStack(stackId)).getServices().stream() + .flatMap( svc -> svc.getComponents().stream() ) + .map(ComponentInfo::getName) + .collect(toSet()); + } + +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorRequest.java index f41ce68..f71b8f5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorRequest.java @@ -18,6 +18,8 @@ package org.apache.ambari.server.api.services.mpackadvisor; +import static java.util.stream.Collectors.toSet; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -33,6 +35,7 @@ import org.apache.ambari.server.topology.MpackInstance; import org.apache.ambari.server.topology.ServiceInstance; import org.apache.commons.lang.StringUtils; +import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.annotate.JsonProperty; /** @@ -42,8 +45,8 @@ public class MpackAdvisorRequest { private MpackAdvisorRequestType requestType; private List<String> hosts = new ArrayList<>(); - // Mpack Instance Name -> Component Name -> host(s) - private Map<String, Map<String, Set<String>>> componentHostsMap = new HashMap<>(); + // Mpack Instance Name -> Component Name -> host(s) + private Map<String, Map<String, Set<String>>> mpackComponentHostsMap = new HashMap<>(); private Map<String, Map<String, Map<String, String>>> configurations = new HashMap<>(); private List<ChangedConfigInfo> changedConfigurations = new LinkedList<>(); private Map<String, String> userContext = new HashMap<>(); @@ -96,6 +99,16 @@ public class MpackAdvisorRequest { public Collection<MpackInstance> getMpackInstances() { return this.mpacks; } + + @JsonIgnore + public Collection<String> getServiceInstanceNames() { + return mpacks.stream() + .flatMap(mpack -> mpack.getServiceInstances().stream()) + .map(ServiceInstance::getName) + .collect(toSet()); + } + + } public Recommendation getRecommendation() { @@ -180,7 +193,7 @@ public class MpackAdvisorRequest { public MpackAdvisorRequest.MpackAdvisorRequestBuilder withMpacksToComponentsHostsMap( Map<String, Map<String, Set<String>>> componentHostsMap) { - this.instance.componentHostsMap = componentHostsMap; + this.instance.mpackComponentHostsMap = componentHostsMap; return this; } @@ -256,8 +269,8 @@ public class MpackAdvisorRequest { return StringUtils.join(serviceInstancesTypeList, ","); } - public Map<String, Map<String, Set<String>>> getComponentHostsMap() { - return componentHostsMap; + public Map<String, Map<String, Set<String>>> getMpackComponentHostsMap() { + return mpackComponentHostsMap; } public String getHostsCommaSeparated() { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorRunner.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorRunner.java index ec7e94f..cdcaf06 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorRunner.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorRunner.java @@ -100,7 +100,7 @@ public class MpackAdvisorRunner { } catch (Exception ioe) { String message = "Error executing Mpack Advisor: "; LOG.error(message, ioe); - throw new MpackAdvisorException(message + ioe.getMessage()); + throw new MpackAdvisorException(message + ioe.getMessage(), ioe); } finally { if (process != null) { process.destroy(); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/commands/MpackAdvisorCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/commands/MpackAdvisorCommand.java index f1ee909..9ae70f6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/commands/MpackAdvisorCommand.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/commands/MpackAdvisorCommand.java @@ -27,7 +27,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -65,6 +64,8 @@ import org.codehaus.jackson.node.ObjectNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * Parent for all commands. */ @@ -164,11 +165,13 @@ public abstract class MpackAdvisorCommand<T extends MpackAdvisorResponse> extend protected abstract void validate(MpackAdvisorRequest request) throws MpackAdvisorException; - protected ObjectNode adjust(String servicesJSON, MpackAdvisorRequest request) { + protected ObjectNode adjust(String servicesJSON, MpackAdvisorRequest request, MpackInstance mpack) { try { ObjectNode root = (ObjectNode) this.mapper.readTree(servicesJSON); + Preconditions.checkNotNull(root.get(SERVICES_PROPERTY), + "No services found for mpack %s (%s-%s). Is the mpack installed?", mpack.getMpackName(), mpack.getMpackType(), mpack.getMpackVersion()); - populateComponentHostsMap(root, request.getComponentHostsMap()); + populateComponentHostsMap(root, request.getMpackComponentHostsMap()); populateServiceAdvisors(root); populateServicesConfigurations(root, request); populateClusterLevelConfigurations(root, request); @@ -180,7 +183,7 @@ public abstract class MpackAdvisorCommand<T extends MpackAdvisorResponse> extend // should not happen String message = "Error parsing services.json file content: " + e.getMessage(); LOG.warn(message, e); - throw new WebApplicationException(Response.status(Status.BAD_REQUEST).entity(message).build()); + throw new WebApplicationException(e, Response.status(Status.BAD_REQUEST).entity(message).build()); } } @@ -222,30 +225,26 @@ public abstract class MpackAdvisorCommand<T extends MpackAdvisorResponse> extend } private void populateServicesConfigurations(ObjectNode root, MpackAdvisorRequest request) { - Collection<MpackInstance> mpackInstances = request.getMpackInstances(); - Iterator<MpackInstance> mpackInstanceItr = mpackInstances.iterator(); ObjectNode configurationsNode = root.putObject(CONFIGURATIONS_PROPERTY); - while (mpackInstanceItr.hasNext()) { - MpackInstance mpackInstance = mpackInstanceItr.next(); - Collection<ServiceInstance> serviceInstances = mpackInstance.getServiceInstances(); - Iterator<ServiceInstance> serviceInstanceItr = serviceInstances.iterator(); - while (serviceInstanceItr.hasNext()) { - ServiceInstance serviceInstance = serviceInstanceItr.next(); + for (MpackInstance mpackInstance: request.getMpackInstances()) { + for (ServiceInstance serviceInstance: mpackInstance.getServiceInstances()) { Configuration configurations = serviceInstance.getConfiguration(); - // We have read configuration properties in attributes as it allows the following format: - // eg: {configType -> {attributeName -> {propName, attributeValue}}} - Map<String, Map<String, Map<String, String>>> configProperties = configurations.getAttributes(); - for (String siteName : configProperties.keySet()) { - ObjectNode siteNode = configurationsNode.putObject(siteName); - - Map<String, Map<String, String>> siteMap = configProperties.get(siteName); - for (String properties : siteMap.keySet()) { - ObjectNode propertiesNode = siteNode.putObject(properties); - - Map<String, String> propertiesMap = siteMap.get(properties); - for (String propertyName : propertiesMap.keySet()) { - String propertyValue = propertiesMap.get(propertyName); - propertiesNode.put(propertyName, propertyValue); + if (null != configurations) { + // We have read configuration properties in attributes as it allows the following format: + // eg: {configType -> {attributeName -> {propName, attributeValue}}} + Map<String, Map<String, Map<String, String>>> configProperties = configurations.getAttributes(); + for (String siteName : configProperties.keySet()) { + ObjectNode siteNode = configurationsNode.putObject(siteName); + + Map<String, Map<String, String>> siteMap = configProperties.get(siteName); + for (String properties : siteMap.keySet()) { + ObjectNode propertiesNode = siteNode.putObject(properties); + + Map<String, String> propertiesMap = siteMap.get(properties); + for (String propertyName : propertiesMap.keySet()) { + String propertyValue = propertiesMap.get(propertyName); + propertiesNode.put(propertyName, propertyValue); + } } } } @@ -270,15 +269,9 @@ public abstract class MpackAdvisorCommand<T extends MpackAdvisorResponse> extend private void populateComponentHostsMap(ObjectNode root, Map<String, Map<String, Set<String>>> mpacksToComponentsHostsMap) { ArrayNode services = (ArrayNode) root.get(SERVICES_PROPERTY); - Iterator<JsonNode> servicesIter = services.getElements(); - - while (servicesIter.hasNext()) { - JsonNode service = servicesIter.next(); + for (JsonNode service: services) { ArrayNode components = (ArrayNode) service.get(SERVICES_COMPONENTS_PROPERTY); - Iterator<JsonNode> componentsIter = components.getElements(); - - while (componentsIter.hasNext()) { - JsonNode component = componentsIter.next(); + for (JsonNode component: components) { ObjectNode componentInfo = (ObjectNode) component.get(COMPONENT_INFO_PROPERTY); String componentMpackName = componentInfo.get(COMPONENT_MPACK_NAME_PROPERTY).getTextValue(); String componentName = componentInfo.get(COMPONENT_NAME_PROPERTY).getTextValue(); @@ -301,14 +294,11 @@ public abstract class MpackAdvisorCommand<T extends MpackAdvisorResponse> extend protected void populateServiceAdvisors(ObjectNode root) { ArrayNode services = (ArrayNode) root.get(SERVICES_PROPERTY); - Iterator<JsonNode> servicesIter = services.getElements(); - ObjectNode version = (ObjectNode) root.get("Versions"); String stackName = version.get("stack_name").asText(); String stackVersion = version.get("stack_version").asText(); - while (servicesIter.hasNext()) { - JsonNode service = servicesIter.next(); + for (JsonNode service: services) { ObjectNode serviceVersion = (ObjectNode) service.get(STACK_SERVICES_PROPERTY); String serviceName = serviceVersion.get("service_name").getTextValue(); try { @@ -439,9 +429,7 @@ public abstract class MpackAdvisorCommand<T extends MpackAdvisorResponse> extend try { JsonNode root = mapper.readTree(hostsJSON); - Iterator<JsonNode> iterator = root.get("items").getElements(); - while (iterator.hasNext()) { - JsonNode next = iterator.next(); + for (JsonNode next: root.get("items")) { String hostName = next.get("Hosts").get("host_name").getTextValue(); registeredHosts.add(hostName); } @@ -491,15 +479,14 @@ public abstract class MpackAdvisorCommand<T extends MpackAdvisorResponse> extend LOG.warn(message); throw new MpackAdvisorException(message); } - - updatedServicesInfoForMpack = adjust(servicesInfoFromMpack, request); + updatedServicesInfoForMpack = adjust(servicesInfoFromMpack, request, mpackInstance); if (LOG.isDebugEnabled()) { LOG.debug("Services information: {}", servicesInfoFromMpack); } - for (Iterator<JsonNode> svcInstanceItr = updatedServicesInfoForMpack.get("services").getElements(); svcInstanceItr.hasNext(); ) { - updatedServicesAcrossMpacks.add(svcInstanceItr.next()); + for (JsonNode svcInstance: updatedServicesInfoForMpack.get("services")) { + updatedServicesAcrossMpacks.add(svcInstance); } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/recommendations/MpackRecommendationResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/recommendations/MpackRecommendationResponse.java index 9ccf4c5..8ccbe1e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/recommendations/MpackRecommendationResponse.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/mpackadvisor/recommendations/MpackRecommendationResponse.java @@ -18,6 +18,10 @@ package org.apache.ambari.server.api.services.mpackadvisor.recommendations; +import static org.apache.ambari.server.controller.internal.MpackAdvisorResourceProvider.BLUEPRINT_HOST_GROUPS_COMPONENTS_MPACK_INSTANCE_PROPERTY; +import static org.apache.ambari.server.controller.internal.MpackAdvisorResourceProvider.BLUEPRINT_HOST_GROUPS_COMPONENTS_NAME_PROPERTY; +import static org.apache.ambari.server.controller.internal.MpackAdvisorResourceProvider.BLUEPRINT_HOST_GROUPS_COMPONENTS_SERVICE_INSTANCE_PROPERTY; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -30,6 +34,8 @@ import org.apache.ambari.server.state.ValueAttributesInfo; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.annotate.JsonSerialize; +import com.google.common.collect.ImmutableMap; + /** * Recommendation response POJO. */ @@ -300,6 +306,13 @@ public class MpackRecommendationResponse extends MpackAdvisorResponse { public void setComponents(Set<Map<String, String>> components) { this.components = components; } + + public static Map<String, String> createComponent(String componentName, String mpackInstance, String serviceInstance) { + return ImmutableMap.of( + BLUEPRINT_HOST_GROUPS_COMPONENTS_NAME_PROPERTY, componentName, + BLUEPRINT_HOST_GROUPS_COMPONENTS_MPACK_INSTANCE_PROPERTY, mpackInstance, + BLUEPRINT_HOST_GROUPS_COMPONENTS_SERVICE_INSTANCE_PROPERTY, serviceInstance); + } } public static class BlueprintClusterBinding { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorBlueprintProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorBlueprintProcessor.java index 87b64a5..564b333 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorBlueprintProcessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorBlueprintProcessor.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.ambari.server.api.services.AdvisorBlueprintProcessor; import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorRequest.StackAdvisorRequestType; import org.apache.ambari.server.api.services.stackadvisor.recommendations.RecommendationResponse; import org.apache.ambari.server.api.services.stackadvisor.recommendations.RecommendationResponse.BlueprintConfigurations; @@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -49,30 +51,20 @@ import com.google.inject.Singleton; * Generate advised configurations for blueprint cluster provisioning by the stack advisor. */ @Singleton -public class StackAdvisorBlueprintProcessor { +public class StackAdvisorBlueprintProcessor implements AdvisorBlueprintProcessor { private static final Logger LOG = LoggerFactory.getLogger(StackAdvisorBlueprintProcessor.class); private static StackAdvisorHelper stackAdvisorHelper; - static final String RECOMMENDATION_FAILED = "Configuration recommendation failed."; - static final String INVALID_RESPONSE = "Configuration recommendation returned with invalid response."; - public static void init(StackAdvisorHelper instance) { stackAdvisorHelper = instance; } - private static final Map<String, String> userContext; - static - { - userContext = new HashMap<>(); - userContext.put("operation", "ClusterCreate"); - } + private static final Map<String, String> userContext = ImmutableMap.of("operation", "ClusterCreate"); /** - * Recommend configurations by the stack advisor, then store the results in cluster topology. - * @param clusterTopology cluster topology instance - * @param userProvidedConfigurations User configurations of cluster provided in Blueprint + Cluster template + * {@inheritDoc} */ public void adviseConfiguration(ClusterTopology clusterTopology, Map<String, Map<String, String>> userProvidedConfigurations) throws ConfigurationTopologyException { for (StackId stackId : clusterTopology.getStackIds()) { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index 7d13e08..ac5fbb0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -445,12 +445,21 @@ public class Configuration { * The location and name of the Python mpack advisor script executed when * configuring services. */ - @Markdown(description = "The location and name of the Python stack advisor script executed when configuring services.") + @Markdown(description = "The location and name of the Python mpack advisor script executed when configuring services.") public static final ConfigurationProperty<String> MPACK_ADVISOR_SCRIPT = new ConfigurationProperty<>( "mpackadvisor.script", AmbariPath.getPath("/var/lib/ambari-server/resources/scripts/mpack_advisor_wrapper.py")); /** + * If set to true, the legacy stack advisor will be used instead of the newer mpack advisor (default value is false). + */ + @Markdown(description = "If set to true, the legacy stack advisor will be used instead of the newer mpack advisor (default value is false).") + public static final ConfigurationProperty<Boolean> USE_LEGACY_STACK_ADVISOR = new ConfigurationProperty<>( + "use.legacy.stackadvisor", + false); + + + /** * The name of the shell script used to wrap all invocations of Python by Ambari. */ @Markdown(description = "The name of the shell script used to wrap all invocations of Python by Ambari. ") @@ -3377,6 +3386,10 @@ public class Configuration { return getProperty(MPACK_ADVISOR_SCRIPT); } + public boolean shouldUseLegacyStackAdvisor() { + return Boolean.valueOf(getProperty(USE_LEGACY_STACK_ADVISOR)); + } + /** * @return a list of prefixes. Packages whose name starts with any of these * prefixes, should be skipped during upgrade. diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java index 5946977..18afc3f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java @@ -53,6 +53,7 @@ import org.apache.ambari.server.api.services.BaseService; import org.apache.ambari.server.api.services.KeyService; import org.apache.ambari.server.api.services.PersistKeyValueImpl; import org.apache.ambari.server.api.services.PersistKeyValueService; +import org.apache.ambari.server.api.services.mpackadvisor.MpackAdvisorBlueprintProcessor; import org.apache.ambari.server.api.services.mpackadvisor.MpackAdvisorHelper; import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor; import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorHelper; @@ -950,6 +951,7 @@ public class AmbariServer { AmbariPrivilegeResourceProvider.init(injector.getInstance(ClusterDAO.class)); ActionManager.setTopologyManager(injector.getInstance(TopologyManager.class)); StackAdvisorBlueprintProcessor.init(injector.getInstance(StackAdvisorHelper.class)); + MpackAdvisorBlueprintProcessor.init(injector.getInstance(MpackAdvisorHelper.class), injector.getInstance(AmbariMetaInfo.class)); ThreadPoolEnabledPropertyProvider.init(injector.getInstance(Configuration.class)); BaseService.init(injector.getInstance(RequestAuditLogger.class)); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java index 9fa570f..d99ae863 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java @@ -57,6 +57,8 @@ import org.apache.ambari.server.actionmanager.HostRoleCommandFactoryImpl; import org.apache.ambari.server.actionmanager.RequestFactory; import org.apache.ambari.server.actionmanager.StageFactory; import org.apache.ambari.server.actionmanager.StageFactoryImpl; +import org.apache.ambari.server.api.services.AdvisorBlueprintProcessor; +import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor; import org.apache.ambari.server.checks.DatabaseConsistencyCheckHelper; import org.apache.ambari.server.checks.PreUpgradeCheck; import org.apache.ambari.server.checks.UpgradeCheckRegistry; @@ -431,6 +433,10 @@ public class ControllerModule extends AbstractModule { InternalAuthenticationInterceptor ambariAuthenticationInterceptor = new InternalAuthenticationInterceptor(); requestInjection(ambariAuthenticationInterceptor); bindInterceptor(any(), annotatedWith(RunWithInternalSecurityContext.class), ambariAuthenticationInterceptor); + + if (configuration.shouldUseLegacyStackAdvisor()) { + bind(AdvisorBlueprintProcessor.class).to(StackAdvisorBlueprintProcessor.class); + } } // ----- helper methods ---------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ConfigurationResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ConfigurationResourceProvider.java index 47bc5bd..5b20a9b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ConfigurationResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ConfigurationResourceProvider.java @@ -254,6 +254,9 @@ public class ConfigurationResourceProvider extends Set<Resource> resources = new HashSet<>(); for (ConfigurationResponse response : responses) { + if (null == response) { + throw new NoSuchResourceException("Could not find configuration resource: " + predicate); + } // don't use the StackId object here; we just want the stack ID string String stackId = response.getStackId().getStackId(); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/MpackAdvisorResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/MpackAdvisorResourceProvider.java index 9f5b740..e0b6491 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/MpackAdvisorResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/MpackAdvisorResourceProvider.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -79,9 +78,9 @@ public abstract class MpackAdvisorResourceProvider extends ReadOnlyResourceProvi private static final String BLUEPRINT_HOST_GROUPS_NAME_PROPERTY = "name"; private static final String BLUEPRINT_HOST_GROUPS_COMPONENTS_PROPERTY = "components"; - private static final String BLUEPRINT_HOST_GROUPS_COMPONENTS_NAME_PROPERTY = "name"; - private static final String BLUEPRINT_HOST_GROUPS_COMPONENTS_MPACK_INSTANCE_PROPERTY = "mpack_instance"; - private static final String BLUEPRINT_HOST_GROUPS_COMPONENTS_SERVICE_INSTANCE_PROPERTY = "service_instance"; + public static final String BLUEPRINT_HOST_GROUPS_COMPONENTS_NAME_PROPERTY = "name"; + public static final String BLUEPRINT_HOST_GROUPS_COMPONENTS_MPACK_INSTANCE_PROPERTY = "mpack_instance"; + public static final String BLUEPRINT_HOST_GROUPS_COMPONENTS_SERVICE_INSTANCE_PROPERTY = "service_instance"; private static final String CHANGED_CONFIGURATIONS_PROPERTY = "changed_configurations"; private static final String OPERATION_PROPERTY = "operation"; @@ -408,33 +407,24 @@ public abstract class MpackAdvisorResourceProvider extends ReadOnlyResourceProvi Map<String, Map<String, Set<String>>> mpacksToComponentsHostsMap = new HashMap<>(); if (null != bindingHostGroups && null != hostGroups) { - Iterator hgItr = hostGroups.iterator(); - while (hgItr.hasNext()) { - HostGroup hostGrp = (HostGroup) hgItr.next(); + for (HostGroup hostGrp: hostGroups) { String hgName = hostGrp.getName(); Set<Map<String, String>> components = hostGrp.getComponents(); Set<String> hosts = bindingHostGroups.get(hgName); - Iterator compItr = components.iterator(); - while (compItr.hasNext()) { - Map<String, String> compValueMap = (Map<String, String>) compItr.next(); + for (Map<String, String> compValueMap: components) { String compName = compValueMap.get("name"); String compMpackname = compValueMap.get("mpack_instance"); - Map<String, Set<String>> mpackToComponentsHostsMap = mpacksToComponentsHostsMap.get(compMpackname); - if (mpackToComponentsHostsMap == null) { - mpackToComponentsHostsMap = new HashMap<>(); - mpacksToComponentsHostsMap.put(compMpackname, mpackToComponentsHostsMap); - } + Map<String, Set<String>> componentsHostsMap = mpacksToComponentsHostsMap.computeIfAbsent( + compMpackname, + __ -> new HashMap<>()); + // Check if 'compName' exists. If exists, fetch and update to its existing hosts. // else, add the 'compName' along with its hosts. - Set<String> updatedHosts = mpackToComponentsHostsMap.get(compName); - if (updatedHosts == null || updatedHosts.isEmpty()) { - mpackToComponentsHostsMap.put(compName, hosts); - } else { - // Fetch and update the existing host(s) Set. - updatedHosts.addAll(hosts); - mpackToComponentsHostsMap.put(compName, updatedHosts); - } + Set<String> updatedHosts = componentsHostsMap.computeIfAbsent( + compName, + __ -> new HashSet<>()); + updatedHosts.addAll(hosts); } } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequest.java index 123cdf4..99b7f01 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequest.java @@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Enums; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -202,6 +203,7 @@ public class ProvisionClusterRequest extends BaseClusterRequest implements Provi setProvisionAction(parseProvisionAction(properties)); mpackInstances = BlueprintFactory.createMpackInstances(properties); + Preconditions.checkArgument(!getAllMpacks().isEmpty(), "No mpacks (stacks) have been defined. Cluster provisioning cannot continue."); stackIds = mpackInstances.stream().map(MpackInstance::getStackId).collect(toSet()); // FIXME persist these try { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java index 48b55aa..3ee636e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java @@ -35,7 +35,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.ambari.server.AmbariException; -import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor; +import org.apache.ambari.server.api.services.AdvisorBlueprintProcessor; import org.apache.ambari.server.controller.ClusterRequest; import org.apache.ambari.server.controller.ConfigurationRequest; import org.apache.ambari.server.controller.ServiceResponse; @@ -71,26 +71,26 @@ public class ClusterConfigurationRequest { private AmbariContext ambariContext; private ClusterTopology clusterTopology; private BlueprintConfigurationProcessor configurationProcessor; - private StackAdvisorBlueprintProcessor stackAdvisorBlueprintProcessor; + private AdvisorBlueprintProcessor advisorBlueprintProcessor; private StackDefinition stack; private boolean configureSecurity = false; public ClusterConfigurationRequest(AmbariContext ambariContext, ClusterTopology topology, - StackAdvisorBlueprintProcessor stackAdvisorBlueprintProcessor, boolean configureSecurity + AdvisorBlueprintProcessor advisorBlueprintProcessor, boolean configureSecurity ) { - this(ambariContext, topology, stackAdvisorBlueprintProcessor); + this(ambariContext, topology, advisorBlueprintProcessor); this.configureSecurity = configureSecurity; } public ClusterConfigurationRequest(AmbariContext ambariContext, ClusterTopology clusterTopology, - StackAdvisorBlueprintProcessor stackAdvisorBlueprintProcessor + AdvisorBlueprintProcessor advisorBlueprintProcessor ) { this.ambariContext = ambariContext; this.clusterTopology = clusterTopology; this.stack = clusterTopology.getStack(); // set initial configuration (not topology resolved) this.configurationProcessor = new BlueprintConfigurationProcessor(clusterTopology); - this.stackAdvisorBlueprintProcessor = stackAdvisorBlueprintProcessor; + this.advisorBlueprintProcessor = advisorBlueprintProcessor; removeOrphanConfigTypes(); } @@ -156,7 +156,7 @@ public class ClusterConfigurationRequest { // obtain recommended configurations before config updates if (clusterTopology.getConfigRecommendationStrategy().shouldUseAdvisor()) { // get merged properties form Blueprint & cluster template (this doesn't contains stack default values) - stackAdvisorBlueprintProcessor.adviseConfiguration(this.clusterTopology, userProvidedConfigurations); + advisorBlueprintProcessor.adviseConfiguration(this.clusterTopology, userProvidedConfigurations); } updatedConfigTypes.addAll(configurationProcessor.doUpdateForClusterCreate()); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopology.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopology.java index 9d622d7..dc58899 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopology.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopology.java @@ -148,6 +148,8 @@ public interface ClusterTopology { */ Stream<ResolvedComponent> getComponents(); + Map<String, Set<ResolvedComponent>> getComponentsByHostGroup(); + /** * Get the components that are included in the specified host group. * @@ -254,4 +256,6 @@ public interface ClusterTopology { Set<String> getHostNames(); SecurityConfiguration getSecurity(); + + Set<MpackInstance> getMpacks(); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopologyImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopologyImpl.java index 672ffd3..3481da5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopologyImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopologyImpl.java @@ -63,6 +63,7 @@ public class ClusterTopologyImpl implements ClusterTopology { private final Set<StackId> stackIds; private final StackDefinition stack; private final SecurityConfiguration securityConfig; + private final Set<MpackInstance> mpacks; private Long clusterId; private final Blueprint blueprint; private final Configuration configuration; @@ -76,6 +77,17 @@ public class ClusterTopologyImpl implements ClusterTopology { private final Map<String, Set<ResolvedComponent>> resolvedComponents; private final Setting setting; + /** + * This is to collect configurations formerly (in Ambari 2.x) belonging to cluster-env and already migrated to + * cluster settings. Eventually all configurations from cluster-env should be migrated and this collection + * should be removed. + */ + private static final Set<String> SAFE_TO_REMOVE_FROM_CLUSTER_ENV = ImmutableSet.of( + ConfigHelper.COMMAND_RETRY_ENABLED, + ConfigHelper.COMMAND_RETRY_MAX_TIME_IN_SEC, + ConfigHelper.COMMANDS_TO_RETRY + ); + public ClusterTopologyImpl( AmbariContext ambariContext, ExportBlueprintRequest topologyRequest, @@ -88,6 +100,7 @@ public class ClusterTopologyImpl implements ClusterTopology { this.configuration = topologyRequest.getConfiguration(); configRecommendationStrategy = ConfigRecommendationStrategy.getDefault(); securityConfig = blueprint.getSecurity(); + this.mpacks = ImmutableSet.of(); // TODO: fix provisionAction = null; provisionRequest = null; @@ -102,20 +115,37 @@ public class ClusterTopologyImpl implements ClusterTopology { adjustTopology(); } - /** - * This is to collect configurations formerly (in Ambari 2.x) belonging to cluster-env and already migrated to - * cluster settings. Eventually all configurations from cluster-env should be migrated and this collection - * should be removed. - */ - private static final Set<String> SAFE_TO_REMOVE_FROM_CLUSTER_ENV = ImmutableSet.of( - ConfigHelper.COMMAND_RETRY_ENABLED, - ConfigHelper.COMMAND_RETRY_MAX_TIME_IN_SEC, - ConfigHelper.COMMANDS_TO_RETRY - ); + public ClusterTopologyImpl( + AmbariContext ambariContext, + BlueprintBasedClusterProvisionRequest request, + Map<String, Set<ResolvedComponent>> resolvedComponents + ) throws InvalidTopologyException { + this.ambariContext = ambariContext; + this.clusterId = request.getClusterId(); + this.blueprint = request.getBlueprint(); + this.configuration = request.getConfiguration(); + this.provisionRequest = request; + this.resolvedComponents = resolvedComponents; + configRecommendationStrategy = + Optional.ofNullable(request.getConfigRecommendationStrategy()).orElse(ConfigRecommendationStrategy.getDefault()); + provisionAction = request.getProvisionAction(); + securityConfig = request.getSecurity(); + + defaultPassword = request.getDefaultPassword(); + stackIds = request.getStackIds(); + stack = request.getStack(); + setting = request.getSetting(); + blueprint.getConfiguration().setParentConfiguration(stack.getConfiguration(getServiceTypes())); + + this.mpacks = request.getAllMpacks(); + + checkForDuplicateHosts(request.getHostGroupInfo()); + registerHostGroupInfo(request.getHostGroupInfo()); + adjustTopology(); + } /** - * This method adjusts cluster topologies coming from the Ambari 2.x blueprint structure for Ambari - * 3.x. + * This method adjusts cluster topologies coming from the Ambari 2.x blueprint structure for Ambari 3.x. * Currently it extract configuration from cluster-env and transforms it into cluster settings. */ private void adjustTopology() { @@ -151,33 +181,6 @@ public class ClusterTopologyImpl implements ClusterTopology { ); } - public ClusterTopologyImpl( - AmbariContext ambariContext, - BlueprintBasedClusterProvisionRequest request, - Map<String, Set<ResolvedComponent>> resolvedComponents - ) throws InvalidTopologyException { - this.ambariContext = ambariContext; - this.clusterId = request.getClusterId(); - this.blueprint = request.getBlueprint(); - this.configuration = request.getConfiguration(); - this.provisionRequest = request; - this.resolvedComponents = resolvedComponents; - configRecommendationStrategy = - Optional.ofNullable(request.getConfigRecommendationStrategy()).orElse(ConfigRecommendationStrategy.getDefault()); - provisionAction = request.getProvisionAction(); - securityConfig = request.getSecurity(); - - defaultPassword = request.getDefaultPassword(); - stackIds = request.getStackIds(); - stack = request.getStack(); - setting = request.getSetting(); - blueprint.getConfiguration().setParentConfiguration(stack.getConfiguration(getServiceTypes())); - - checkForDuplicateHosts(request.getHostGroupInfo()); - registerHostGroupInfo(request.getHostGroupInfo()); - adjustTopology(); - } - public ClusterTopologyImpl withAdditionalComponents(Map<String, Set<ResolvedComponent>> additionalComponents) throws InvalidTopologyException { if (additionalComponents.isEmpty()) { return this; @@ -337,6 +340,11 @@ public class ClusterTopologyImpl implements ClusterTopology { .flatMap(Collection::stream); } + @Override + public Map<String, Set<ResolvedComponent>> getComponentsByHostGroup() { + return resolvedComponents; + } + @Override @Nonnull public Stream<ResolvedComponent> getComponentsInHostGroup(String hostGroup) { return resolvedComponents.computeIfAbsent(hostGroup, __ -> ImmutableSet.of()).stream(); @@ -481,6 +489,12 @@ public class ClusterTopologyImpl implements ClusterTopology { } } + @Override + public Set<MpackInstance> getMpacks() { + return mpacks; + } + + private void registerHostGroupInfo(Map<String, HostGroupInfo> requestedHostGroupInfoMap) throws InvalidTopologyException { LOG.debug("Registering requested host group information for {} host groups", requestedHostGroupInfoMap.size()); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/MpackInstance.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/MpackInstance.java index 3497ee5..4c348c2 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/MpackInstance.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/MpackInstance.java @@ -64,6 +64,21 @@ public class MpackInstance implements Configurable { this.configuration = configuration; } + public MpackInstance(String mpackName, + String mpackType, + String mpackVersion, + String url, + Configuration configuration, + Collection<ServiceInstance> serviceInstances) { + this.mpackName = mpackName; + this.mpackType = mpackType; + this.mpackVersion = mpackVersion; + this.url = url; + this.configuration = configuration; + this.serviceInstances = serviceInstances; + } + + public MpackInstance(String mpackName, String mpackType, String mpackVersion, Collection<ServiceInstance> serviceInstances) { this.mpackName = mpackName; this.mpackType = mpackType; @@ -71,6 +86,13 @@ public class MpackInstance implements Configurable { this.serviceInstances = serviceInstances; } + /** + * @return a shallow copy of this object + */ + public MpackInstance copy() { + return new MpackInstance(mpackName, mpackType, mpackVersion, url, configuration, serviceInstances); + } + public MpackInstance() { } public String getMpackName() { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ServiceInstance.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ServiceInstance.java index 8fcee98..ce70e33 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ServiceInstance.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ServiceInstance.java @@ -34,6 +34,13 @@ public class ServiceInstance implements Configurable { this.configuration = configuration; } + public ServiceInstance(String name, String type, Configuration configuration, MpackInstance mpackInstance) { + this.name = name; + this.type = type; + this.configuration = configuration; + this.mpackInstance = mpackInstance; + } + public String getName() { return name; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index d47ed27..29fe62d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -40,7 +40,7 @@ import javax.inject.Inject; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.HostRoleStatus; -import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor; +import org.apache.ambari.server.api.services.AdvisorBlueprintProcessor; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.RequestStatusResponse; @@ -135,7 +135,7 @@ public class TopologyManager { private Configuration configuration; @Inject - private StackAdvisorBlueprintProcessor stackAdvisorBlueprintProcessor; + private AdvisorBlueprintProcessor advisorBlueprintProcessor; @Inject private LogicalRequestFactory logicalRequestFactory; @@ -326,7 +326,7 @@ public class TopologyManager { clusterTopologyMap.put(clusterId, topology); ClusterConfigurationRequest configurationRequest = new ClusterConfigurationRequest(ambariContext, topology, - stackAdvisorBlueprintProcessor, securityConfiguration.getType() == SecurityType.KERBEROS + advisorBlueprintProcessor, securityConfiguration.getType() == SecurityType.KERBEROS ); configurationRequest.setInitialConfigurations(); addClusterConfigRequest(logicalRequest, topology, configurationRequest); @@ -1042,7 +1042,7 @@ public class TopologyManager { LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, but provision request is finished, skipping cluster config request"); } else { LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, adding cluster config request"); - ClusterConfigurationRequest configRequest = new ClusterConfigurationRequest(ambariContext, topology, stackAdvisorBlueprintProcessor); + ClusterConfigurationRequest configRequest = new ClusterConfigurationRequest(ambariContext, topology, advisorBlueprintProcessor); addClusterConfigRequest(provisionRequest, topology, configRequest); } } else { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/ExceptionUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/ExceptionUtils.java new file mode 100644 index 0000000..eed8e27 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/ExceptionUtils.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.utils; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.concurrent.Callable; + +/** + * Utilities for more convenient exception handling + */ +public class ExceptionUtils { + + /** + * Utility to simplify the try-catch-rethrow-RuntimeExeption pattern commonly found in the code. + * @param throwingLambda A lambda expression that can throw an (ususally checked) exception + * @param <R> The return type of the expression + * @return The return value of the lamba expression. In case an {@link Exception} is thrown during lambda invocation the + * exception is converted into a {@link RuntimeException} if needed. See {@link #convertToRuntime} + * for conversion logic. + */ + public static <R> R unchecked(Callable<R> throwingLambda) { + try { + return throwingLambda.call(); + } + catch (Exception ex) { + throw convertToRuntime(ex); + } + } + + /** + * Same as {@link #unchecked(Callable)} but for void methods. + * @param throwingLambda A void lambda expression that can throw an (ususally checked) exception + */ + public static void uncheckedVoid(ThrowingRunnable throwingLambda) { + try { + throwingLambda.run(); + } + catch (Exception ex) { + throw convertToRuntime(ex); + } + } + + + /** + * Utility to convert checked exceptions to runtime exceptions. + * <ul> + * <li>If the input exception is already of type {@link RuntimeException} the input exception is returned</li> + * <li>If the input exception is of type {@link IOException} the execption will be wrapped into an + * {@link UncheckedIOException}</li> + * <li>Other checked exeptions will be wrapped into a {@link RuntimeException}</li> + * <li>Conversion logic could be improved later</li> + * </ul> + * @param ex The input exception can be a checked as well as a runtime exception + * @return a runtime exception which equals to {@code ex} if ex is a runtime exception or {@code ex} wrapped in a + * {@link RuntimeException}. + */ + public static RuntimeException convertToRuntime(Exception ex) { + return ex instanceof RuntimeException ? (RuntimeException)ex : + ex instanceof IOException ? new UncheckedIOException((IOException)ex) : + new RuntimeException(ex); + } + + public interface ThrowingRunnable { + void run() throws Exception; + } + +} diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorBlueprintProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorBlueprintProcessorTest.java new file mode 100644 index 0000000..5041b8b --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/mpackadvisor/MpackAdvisorBlueprintProcessorTest.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.api.services.mpackadvisor; + + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; +import static org.apache.ambari.server.api.services.mpackadvisor.MpackAdvisorRequest.MpackAdvisorRequestType.CONFIGURATIONS; +import static org.apache.ambari.server.utils.ExceptionUtils.uncheckedVoid; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.getCurrentArguments; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.MockType.NICE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import org.apache.ambari.server.api.services.AmbariMetaInfo; +import org.apache.ambari.server.api.services.mpackadvisor.recommendations.MpackRecommendationResponse; +import org.apache.ambari.server.state.ComponentInfo; +import org.apache.ambari.server.state.ServiceInfo; +import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.StackInfo; +import org.apache.ambari.server.state.ValueAttributesInfo; +import org.apache.ambari.server.topology.AdvisedConfiguration; +import org.apache.ambari.server.topology.Blueprint; +import org.apache.ambari.server.topology.ClusterTopology; +import org.apache.ambari.server.topology.Component; +import org.apache.ambari.server.topology.ConfigRecommendationStrategy; +import org.apache.ambari.server.topology.Configuration; +import org.apache.ambari.server.topology.HostGroup; +import org.apache.ambari.server.topology.HostGroupImpl; +import org.apache.ambari.server.topology.HostGroupInfo; +import org.apache.ambari.server.topology.MpackInstance; +import org.apache.ambari.server.topology.ResolvedComponent; +import org.apache.ambari.server.topology.ServiceInstance; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +@RunWith(EasyMockRunner.class) +public class MpackAdvisorBlueprintProcessorTest { + + private static final Logger LOG = LoggerFactory.getLogger(MpackAdvisorBlueprintProcessorTest.class); + + private MpackAdvisorBlueprintProcessor processor; + + private static final String HOSTGROUP_1 = "host_group_1"; + + private static final String HOST_1 = "c7401.ambari.apache.org"; + + private static final String NAMENODE = "NAMENODE"; + private static final String DATANODE = "DATANODE"; + private static final String ZOOKEEPER_SERVER = "ZOOKEPER_SERVER"; + private static final String HADOOP_CLIENT = "HADOOP_CLIENT"; + private static final String ZOOKEEPER_CLIENT = "ZOOKEEPER_CLIENT"; + private static final String HBASE_MASTER = "HBASE_MASTER"; + private static final String HBASE_REGIONSERVER = "HBASE_REGIONSERVER"; + + private static final String ZOOKEEPER = "ZOOKEEPER"; + private static final String ZOOKEEPER_CLIENTS = "ZOOKEEPER_CLIENTS"; + private static final String HDFS = "HDFS"; + private static final String HADOOP_CLIENTS = "HADOOP_CLIENTS"; + private static final String HBASE = "HBASE"; + + private static final StackId STACK_ID_HDP_CORE = new StackId("HDPCORE", "1.0.0"); + private static final StackId STACK_ID_ODS = new StackId("ODS", "1.0.1"); + + private static final Map<String, Set<String>> mpackComponents = ImmutableMap.of( + STACK_ID_HDP_CORE.getStackName(), ImmutableSet.of(NAMENODE, DATANODE, HADOOP_CLIENT, ZOOKEEPER_SERVER, ZOOKEEPER_CLIENT), + STACK_ID_ODS.getStackName(), ImmutableSet.of(HBASE_MASTER, HBASE_REGIONSERVER, HADOOP_CLIENT, ZOOKEEPER_CLIENT) + ); + + private static final Map<String, String> configTypeToService = ImmutableMap.of( + "zoo.cfg", "ZOOKEEPER", + "hdfs-site", "HDFS", + "hbase-size", "HBASE" + ); + + @Mock(type = NICE) + private MpackAdvisorHelper helper; + + @Mock(type = NICE) + private AmbariMetaInfo metaInfo; + + @Mock(type = NICE) + private ClusterTopology topology; + + @Mock(type = NICE) + private StackInfo hdpCore; + + @Mock(type = NICE) + private StackInfo ods; + + // These test data are reconfigurable to allow for flexibility for later unit tests + private Set<MpackInstance> mpacks = Sets.newHashSet(); + private Map<String, AdvisedConfiguration> advisedConfigurations = new HashMap<>(); + private Map<String, Set<ResolvedComponent>> componentsByHostgroup = new HashMap<>(); + private ConfigRecommendationStrategy recommendationStrategy = ConfigRecommendationStrategy.ALWAYS_APPLY_DONT_OVERRIDE_CUSTOM_VALUES; + private List<ServiceInfo> hdpCoreServices = new ArrayList<>(); + private List<ServiceInfo> odsServices = new ArrayList<>(); + private Configuration configuration = new Configuration(new HashMap<>(), new HashMap<>()); + private Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<>(); + private Map<String, HostGroup> hostGroupMap = new HashMap<>(); + private Map<String, Map<String, String>> userConfigs = new HashMap<>(); + private MpackRecommendationResponse response = createRecommendationResponse(); + + private final MpackInstance hdpCoreMpack = new MpackInstance( + STACK_ID_HDP_CORE.getStackName(), + STACK_ID_HDP_CORE.getStackName(), + STACK_ID_HDP_CORE.getStackVersion(), + "http://hdpcore.info", + new Configuration(new HashMap<>(), new HashMap<>()), + new HashSet<>()); + + private final MpackInstance odsMpack = new MpackInstance( + STACK_ID_ODS.getStackName(), + STACK_ID_ODS.getStackName(), + STACK_ID_ODS.getStackVersion(), + "http://ods.info", + new Configuration(new HashMap<>(), new HashMap<>()), + new HashSet<>()); + + + public void setUp() throws Exception { + // setup topology + expect(topology.getMpacks()).andReturn(mpacks).anyTimes(); + expect(topology.getAdvisedConfigurations()).andReturn(advisedConfigurations).anyTimes(); + expect(topology.getComponentsByHostGroup()).andReturn(componentsByHostgroup).anyTimes(); + expect(topology.getComponents()).andAnswer( + () -> componentsByHostgroup.values().stream().flatMap(Set::stream)).anyTimes(); + expect(topology.isValidConfigType(anyString())).andReturn(true).anyTimes(); + expect(topology.getConfigRecommendationStrategy()).andAnswer(() -> recommendationStrategy).anyTimes(); + expect(topology.getConfiguration()).andReturn(configuration).anyTimes(); + expect(topology.getHostGroupInfo()).andReturn(hostGroupInfoMap).anyTimes(); + Blueprint blueprint = createMock(Blueprint.class); + expect(blueprint.getHostGroups()).andReturn(hostGroupMap).anyTimes(); + replay(blueprint); + expect(topology.getBlueprint()).andReturn(blueprint).anyTimes(); + replay(topology); + + // setup metainfo + expect(metaInfo.getStack(STACK_ID_HDP_CORE)).andReturn(hdpCore).anyTimes(); + expect(metaInfo.getStack(STACK_ID_ODS)).andReturn(ods).anyTimes(); + replay(metaInfo); + + // setup stacks + expect(hdpCore.getServices()).andReturn(hdpCoreServices).anyTimes(); + replay(hdpCore); + expect(ods.getServices()).andReturn(odsServices).anyTimes(); + replay(ods); + + // setup mpack instances + mpacks.add(hdpCoreMpack); + mpacks.add(odsMpack); + + // setup default host groups + addHostGroup(HOSTGROUP_1, + ImmutableList.of(HOST_1), + ImmutableSet.of( + resolvedComponent(HADOOP_CLIENT, HADOOP_CLIENTS, STACK_ID_HDP_CORE), + resolvedComponent(ZOOKEEPER_CLIENT, ZOOKEEPER_CLIENTS, STACK_ID_ODS), + resolvedComponent(NAMENODE, HDFS, STACK_ID_HDP_CORE), + resolvedComponent(DATANODE, HDFS, STACK_ID_HDP_CORE), + resolvedComponent(ZOOKEEPER_SERVER, ZOOKEEPER, STACK_ID_HDP_CORE), + resolvedComponent(HBASE_MASTER, HBASE, STACK_ID_ODS), + resolvedComponent(HBASE_REGIONSERVER, HBASE, STACK_ID_ODS) + ) + ); + + // setup services + hdpCoreServices.addAll(ImmutableList.of( + serviceInfo("HDFS", "HADOOP_CLIENT", "NAMENODE", "DATANODE"), + serviceInfo("ZOOKEEPER", "ZOOKEPER_SERVER"), + serviceInfo("HADOOP_CLIENTS", "HADOOP_CLIENT"), + serviceInfo("ZOOKEEPER_CLIENTS", "ZOOKEEPER_CLIENT") + )); + odsServices.addAll(ImmutableList.of( + serviceInfo("HBASE", "HBASE_MASTER", "HBASE_REGIONSERVER"), + serviceInfo("HADOOP_CLIENTS", "HADOOP_CLIENT"), + serviceInfo("ZOOKEEPER_CLIENTS", "ZOOKEEPER_CLIENT") + )); + + // setup user supplied configs + userConfigs.put("hdfs-site", ImmutableMap.of( + "dfs.datanode.data.dir", "/grid/0/hdfs/data", + "dfs.namenode.name.dir", "/grid/0/hdfs/namenode")); + userConfigs.put("hbase-site", ImmutableMap.of( + "hbase.regionserver.global.memstore.size", "0.3", + "hbase.coprocessor.region.classes", "org.apache.hadoop.hbase.security.access.CustomClass")); + + // setup helper + expect(helper.recommend(anyObject())).andAnswer(() -> { + checkMpackRecommendationRequest((MpackAdvisorRequest)getCurrentArguments()[0]); + return response; + }); + replay(helper); + + MpackAdvisorBlueprintProcessor.init(helper, metaInfo); + processor = new MpackAdvisorBlueprintProcessor(); + } + + @After + public void tearDown() { + reset(helper, metaInfo, topology, hdpCore, ods); + } + + @Test + public void testAdviseConfiguration_OverrideUserConfig() throws Exception { + // GIVEN + setUp(); + recommendationStrategy = ConfigRecommendationStrategy.ALWAYS_APPLY; + LOG.info("Testing using recommendation strategy: {}", recommendationStrategy); + + // WHEN + processor.adviseConfiguration(topology, userConfigs); + + // THEN + Map<String, AdvisedConfiguration> advisedConfigurations = topology.getAdvisedConfigurations(); + assertEquals(ImmutableSet.of("hdfs-site", "zoo.cfg", "hbase-site"), advisedConfigurations.keySet()); + + // all recommended configs coming from the advisor must make it into the topology + Set<String> expectedAdvisedZooCfgKeys = ImmutableSet.of("dataDir"); + Set<String> expectedAdvisedHdfsSiteKeys = ImmutableSet.of("dfs.namenode.checkpoint.dir", "dfs.datanode.data.dir"); + Set<String> expectedAdvisedHbaseSiteKeys = ImmutableSet.of("hbase.regionserver.wal.codec", "hbase.regionserver.global.memstore.size"); + + assertEquals(expectedAdvisedZooCfgKeys, advisedConfigurations.get("zoo.cfg").getProperties().keySet()); + assertEquals(expectedAdvisedHdfsSiteKeys, advisedConfigurations.get("hdfs-site").getProperties().keySet()); + assertEquals(expectedAdvisedHdfsSiteKeys, advisedConfigurations.get("hdfs-site").getPropertyValueAttributes().keySet()); + assertEquals(expectedAdvisedHbaseSiteKeys, advisedConfigurations.get("hbase-site").getProperties().keySet()); + assertEquals(expectedAdvisedHbaseSiteKeys, advisedConfigurations.get("hbase-site").getPropertyValueAttributes().keySet()); + } + + @Test + public void testAdviseConfiguration_KeepUserConfig() throws Exception { + // GIVEN + EnumSet.of( + ConfigRecommendationStrategy.ONLY_STACK_DEFAULTS_APPLY, + ConfigRecommendationStrategy.ALWAYS_APPLY_DONT_OVERRIDE_CUSTOM_VALUES).forEach( + strategy -> { + LOG.info("Testing using recommendation strategy: {}", strategy); + recommendationStrategy = strategy; + uncheckedVoid(() -> { + setUp(); + // WHEN + processor.adviseConfiguration(topology, userConfigs); + }); + // THEN + + // only keys not in the user supplied configs should be among the advised configs. + Set<String> expectedAdvisedZooCfgKeys = ImmutableSet.of("dataDir"); + Set<String> expectedAdvisedHdfsSiteKeys = ImmutableSet.of("dfs.namenode.checkpoint.dir"); + Set<String> expectedAdvisedHbaseSiteKeys = ImmutableSet.of("hbase.regionserver.wal.codec"); + + assertEquals(expectedAdvisedZooCfgKeys, advisedConfigurations.get("zoo.cfg").getProperties().keySet()); + assertEquals(expectedAdvisedHdfsSiteKeys, advisedConfigurations.get("hdfs-site").getProperties().keySet()); + assertEquals(expectedAdvisedHdfsSiteKeys, advisedConfigurations.get("hdfs-site").getPropertyValueAttributes().keySet()); + assertEquals(expectedAdvisedHbaseSiteKeys, advisedConfigurations.get("hbase-site").getProperties().keySet()); + assertEquals(expectedAdvisedHbaseSiteKeys, advisedConfigurations.get("hbase-site").getPropertyValueAttributes().keySet()); + + tearDown(); + } + ); + } + + + /** + * Check if the MpackAdvisorRequest was created correctly + */ + private void checkMpackRecommendationRequest(MpackAdvisorRequest request) { + assertEquals(CONFIGURATIONS, request.getRequestType()); + + List<String> expectedHosts = + hostGroupInfoMap.values().stream().flatMap(hg -> hg.getHostNames().stream()).collect(toList()); + assertEquals(expectedHosts, request.getHosts()); + + // check mpack -> component -> hosts map + assertTrue(!request.getMpackComponentHostsMap().isEmpty()); + request.getMpackComponentHostsMap().entrySet().forEach( + e -> { + String mpackName = e.getKey(); + Map<String, Set<String>> compentsToHosts = e.getValue(); + assertTrue(!compentsToHosts.isEmpty()); // at least one component is used from the mpack + Set<String> validMpackComponents = mpackComponents.get(mpackName); + compentsToHosts.entrySet().forEach( compToHosts -> { + assertTrue(validMpackComponents.contains(compToHosts.getKey())); // component is valid for that mpack + assertTrue(!compToHosts.getValue().isEmpty()); // component is mapped to at least one host + }); + }); + + Map<String, Set<String>> expectedHostBindings = + hostGroupInfoMap.entrySet().stream().collect( toMap( + Map.Entry::getKey, + hostGroupToHosts -> hostGroupToHosts.getValue().getHostNames())); + assertEquals(expectedHostBindings, request.getRecommendation().getBlueprintClusterBinding()); + + MpackAdvisorRequest.Blueprint requestBlueprint = request.getRecommendation().getBlueprint(); + + // verify that mpack instances have been enriched with service instances (mpack advisor requires this even when + // no explicit service instances have been defined in the blueprint) + Set<String> validHdpServiceNames = hdpCoreServices.stream().map(ServiceInfo::getName).collect(toSet()); + Set<String> validOdsServiceNames = odsServices.stream().map(ServiceInfo::getName).collect(toSet()); + requestBlueprint.getMpackInstances().forEach( mpack -> { + Set<String> validServiceNames = + mpack.getMpackType().equals(STACK_ID_HDP_CORE.getStackName()) ? validHdpServiceNames : validOdsServiceNames; + Set<String> serviceInstanceNames = mpack.getServiceInstances().stream().map(ServiceInstance::getName).collect(toSet()); + assertTrue("No service instances has been added to mpack " + mpack.getMpackName(), !serviceInstanceNames.isEmpty()); + Set<String> unexpectedServices = Sets.difference(serviceInstanceNames, validServiceNames); + assertTrue("Unexected service instances: " + unexpectedServices + " for mpack instance: " + mpack.getMpackName(), + unexpectedServices.isEmpty()); + }); + } + + /** + * Creates a recommendation response + */ + private MpackRecommendationResponse createRecommendationResponse() { + MpackRecommendationResponse response = new MpackRecommendationResponse(); + MpackRecommendationResponse.Recommendation recommendation = new MpackRecommendationResponse.Recommendation(); + MpackRecommendationResponse.Blueprint blueprint = new MpackRecommendationResponse.Blueprint(); + recommendation.setBlueprint(blueprint); + response.setRecommendations(recommendation); + + MpackRecommendationResponse.MpackInstance responseHdpCoreMpack = responseMpack(STACK_ID_HDP_CORE, + ImmutableSet.of( + serviceInstance("HDFS", + ImmutableMap.of( + "hdfs-site", + config(ImmutableMap.of( + "dfs.namenode.checkpoint.dir", "/hadoop/hdfs/namesecondary", + "dfs.datanode.data.dir", "/hadoop/hdfs/data"))) + ), + serviceInstance("ZOOKEEPER", + ImmutableMap.of( + "zoo.cfg", + config(ImmutableMap.of("dataDir", "/hadoop/zookeeper")))) + ) + ); + + MpackRecommendationResponse.MpackInstance responseOdsMpack = responseMpack(STACK_ID_ODS, + ImmutableSet.of( + serviceInstance("HBASE", + ImmutableMap.of( + "hbase-site", + config(ImmutableMap.of( + "hbase.regionserver.wal.codec", "org.apache.hadoop.hbase.regionserver.wal.WALCellCodec", + "hbase.regionserver.global.memstore.size", "0.4"))) + ) + ) + ); + blueprint.setMpackInstances(ImmutableSet.of(responseHdpCoreMpack, responseOdsMpack)); + return response; + } + + private void addHostGroup(String name, List<String> hosts, Set<ResolvedComponent> components) { + hostGroupInfoMap.put(name, hostGroupInfo(name, hosts)); + List<String> componentNames = components.stream().map(ResolvedComponent::componentName).collect(toList()); + hostGroupMap.put(name, hostGroup(name, componentNames)); + componentsByHostgroup.put(name, components); + } + + + // ----- FACTORY METHODS FOR TEST DATA ----- + + private ResolvedComponent resolvedComponent(String name, String serviceName, StackId stackId) { + return ResolvedComponent.builder(name) + .serviceName(serviceName) + .serviceInfo(serviceInfo(serviceName)) + .stackId(stackId) + .buildPartial(); + } + + private MpackRecommendationResponse.MpackInstance responseMpack(StackId stackId, + Set<MpackRecommendationResponse.ServiceInstance> services) { + MpackRecommendationResponse.MpackInstance responseMpack = new MpackRecommendationResponse.MpackInstance(); + responseMpack.setName(stackId.getStackName()); + responseMpack.setVersion(stackId.getStackVersion()); + responseMpack.setServiceInstances(services); + return responseMpack; + } + + private MpackRecommendationResponse.ServiceInstance serviceInstance(String name, Map<String, MpackRecommendationResponse.BlueprintConfigurations> configs) { + MpackRecommendationResponse.ServiceInstance instance = new MpackRecommendationResponse.ServiceInstance(); + instance.setName(name); + instance.setType(name); + instance.setConfigurations(configs); + return instance; + } + + /** + * @param properties Properties in key-value format. The same keys will be added as attributes as well + * with the value {@code delete=true} to test the handling of advised attributes + * @return an instance of {@link MpackRecommendationResponse.BlueprintConfigurations} object with the given data + */ + private MpackRecommendationResponse.BlueprintConfigurations config(Map<String, String> properties) { + MpackRecommendationResponse.BlueprintConfigurations config = new MpackRecommendationResponse.BlueprintConfigurations(); + config.setProperties(properties); + Map<String, ValueAttributesInfo> propertyAttributes = properties.keySet().stream().collect(toMap( + Function.identity(), + __ -> { + ValueAttributesInfo valueAttributesInfo = new ValueAttributesInfo(); + valueAttributesInfo.setDelete("true"); + return valueAttributesInfo; + } + )); + config.setPropertyAttributes(propertyAttributes); + return config; + } + + private ServiceInfo serviceInfo(String name, String... components) { + ServiceInfo service = new ServiceInfo(); + service.setName(name); + service.setComponents( + Arrays.stream(components).map(cName -> { + ComponentInfo component = new ComponentInfo(); + component.setName(cName); + return component; + }).collect(toList())); + return service; + } + + private HostGroupInfo hostGroupInfo(String name, List<String> hosts) { + HostGroupInfo info = new HostGroupInfo(name); + hosts.forEach(info::addHost); + return info; + } + + private Component component(String name) { + if (name.contains("@")) { + List<String> nameAndMpack = Splitter.on('@').splitToList(name); + return new Component(nameAndMpack.get(0), nameAndMpack.get(1), null, null); + } + else { + return new Component(name); + } + } + + private HostGroup hostGroup(String name, List<String> components) { + return new HostGroupImpl(name, + components.stream().map(cName -> component(cName)).collect(toList()), + new Configuration(new HashMap<>(), new HashMap<>()), + "1+"); + } + +} \ No newline at end of file diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/mpackadvisor/commands/MpackAdvisorCommandTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/mpackadvisor/commands/MpackAdvisorCommandTest.java index 3550db3..74fb3eb 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/api/services/mpackadvisor/commands/MpackAdvisorCommandTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/mpackadvisor/commands/MpackAdvisorCommandTest.java @@ -25,6 +25,7 @@ import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -126,7 +127,7 @@ public class MpackAdvisorCommandTest { MpackAdvisorCommand.MpackAdvisorData data = new MpackAdvisorCommand.MpackAdvisorData(hostsJSON, servicesJSON); doReturn(hostsJSON).when(command).getHostsInformation(request); doReturn(data).when(command).getServicesInformation(request, hostsJSON); - doReturn(objectNode).when(command).adjust(servicesJSON, request); + doReturn(objectNode).when(command).adjust(eq(servicesJSON), eq(request), any()); doAnswer(new Answer() { public Object answer(InvocationOnMock invocation) throws Throwable { @@ -218,7 +219,7 @@ public class MpackAdvisorCommandTest { ObjectNode objectNode = (ObjectNode) cmd.mapper.readTree(objectNodeStr); doReturn(hostsJSON).when(command).getHostsInformation(request); - doReturn(objectNode).when(command).adjust(servicesJSON, request); + doReturn(objectNode).when(command).adjust(eq(servicesJSON), eq(request), any()); request.getRecommendation().getBlueprint().setMpackInstances(MpackAdvisorHelperTest.createOdsMpackInstance()); response = createNiceMock(Response.class); diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequestTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequestTest.java index 18be047..49ee36a 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequestTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequestTest.java @@ -53,6 +53,7 @@ import org.apache.ambari.server.topology.BlueprintFactory; import org.apache.ambari.server.topology.Configuration; import org.apache.ambari.server.topology.HostGroupInfo; import org.apache.ambari.server.topology.InvalidTopologyTemplateException; +import org.apache.ambari.server.topology.MpackInstance; import org.apache.ambari.server.topology.SecurityConfiguration; import org.apache.ambari.server.topology.TopologyRequest; import org.junit.After; @@ -63,6 +64,7 @@ import org.junit.rules.ExpectedException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; /** @@ -98,6 +100,7 @@ public class ProvisionClusterRequestTest { expect(blueprint.getConfiguration()).andReturn(blueprintConfig).anyTimes(); expect(hostResourceProvider.checkPropertyIds(Collections.singleton("Hosts/host_name"))). andReturn(Collections.emptySet()).once(); + expect(blueprint.getMpacks()).andReturn(ImmutableSet.of(createMock(MpackInstance.class))).anyTimes(); replay(blueprintFactory, blueprint, hostResourceProvider); } @@ -282,6 +285,15 @@ public class ProvisionClusterRequestTest { assertEquals("someAttributePropValue", clusterScopedTypePropertyAttributes.get("property1")); } + @Test(expected= IllegalArgumentException.class) + public void test_NoMpacksDefined() throws Exception { + Map<String, Object> properties = createBlueprintRequestProperties(CLUSTER_NAME, BLUEPRINT_NAME); + reset(blueprint); + expect(blueprint.getMpacks()).andReturn(ImmutableSet.of()); + replay(blueprint); + new ProvisionClusterRequest(properties, null, true); + } + @Test(expected= InvalidTopologyTemplateException.class) public void test_NoHostGroupInfo() throws Exception { Map<String, Object> properties = createBlueprintRequestProperties(CLUSTER_NAME, BLUEPRINT_NAME);