This is an automated email from the ASF dual-hosted git repository. adoroszlai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new fe80b39 AMBARI-24926. Apply user-defined configuration for Add Service request. (#2639) fe80b39 is described below commit fe80b390b03a47bda87dde95bc9feb11a49818a0 Author: Doroszlai, Attila <6454655+adorosz...@users.noreply.github.com> AuthorDate: Wed Nov 21 17:34:28 2018 +0100 AMBARI-24926. Apply user-defined configuration for Add Service request. (#2639) --- .../addservice/AddServiceOrchestrator.java | 5 +- .../addservice/ResourceProviderAdapter.java | 182 +++++++++++++++++---- 2 files changed, 149 insertions(+), 38 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java index e137d4a..d30ab09 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java @@ -113,8 +113,8 @@ public class AddServiceOrchestrator { throw new IllegalArgumentException("No new services to be added"); } - Configuration config = stack.getValidDefaultConfig(); - // TODO add user-defined config + Configuration config = request.getConfiguration(); + config.setParentConfiguration(stack.getValidDefaultConfig()); RequestStageContainer stages = new RequestStageContainer(actionManager.getNextRequestId(), null, requestFactory, actionManager); AddServiceInfo validatedRequest = new AddServiceInfo(cluster.getClusterName(), stack, config, stages, newServices); @@ -149,6 +149,7 @@ public class AddServiceOrchestrator { */ private void createResources(AddServiceInfo request) { LOG.info("Creating resources for {}", request); + resourceProviders.updateExistingConfigs(request); resourceProviders.createServices(request); resourceProviders.createComponents(request); resourceProviders.createConfigs(request); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java index 722d5f6..ee52c7f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java @@ -20,12 +20,12 @@ package org.apache.ambari.server.topology.addservice; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.stream.Stream; import javax.inject.Inject; import javax.inject.Singleton; @@ -56,13 +56,16 @@ import org.apache.ambari.server.controller.spi.UnsupportedPropertyException; import org.apache.ambari.server.controller.utilities.ClusterControllerHelper; import org.apache.ambari.server.controller.utilities.PropertyHelper; import org.apache.ambari.server.security.authorization.AuthorizationException; +import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.State; +import org.apache.ambari.server.topology.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; /** * Creates resources using the resource providers. @@ -76,6 +79,9 @@ public class ResourceProviderAdapter { @Inject private AmbariManagementController controller; + @Inject + private ConfigHelper configHelper; + public void createServices(AddServiceInfo request) { LOG.info("Creating service resources for {}", request); @@ -83,7 +89,7 @@ public class ResourceProviderAdapter { .map(service -> createServiceRequestProperties(request, service)) .collect(toSet()); - createResources(properties, Resource.Type.Service); + createResources(request, properties, Resource.Type.Service); } public void createComponents(AddServiceInfo request) { @@ -94,7 +100,7 @@ public class ResourceProviderAdapter { .map(component -> createComponentRequestProperties(request, componentsOfService.getKey(), component))) .collect(toSet()); - createResources(properties, Resource.Type.Component); + createResources(request, properties, Resource.Type.Component); } public void createHostComponents(AddServiceInfo request) { @@ -106,30 +112,19 @@ public class ResourceProviderAdapter { .map(host -> createHostComponentRequestProperties(request, componentsOfService.getKey(), hostsOfComponent.getKey(), host)))) .collect(toSet()); - createResources(properties, Resource.Type.HostComponent); + createResources(request, properties, Resource.Type.HostComponent); } public void createConfigs(AddServiceInfo request) { LOG.info("Creating configurations for {}", request); + Set<ClusterRequest> requests = createConfigRequestsForNewServices(request); + updateCluster(request, requests, "Error creating configurations for %s"); + } - Set<ClusterRequest> requests = new HashSet<>(); - for (String service : request.newServices().keySet()) { - List<ConfigurationRequest> configRequests = request.getStack().getConfigurationTypes(service).stream() - .filter(configType -> !Objects.equals(configType, ConfigHelper.CLUSTER_ENV)) - .map(configType -> createClusterConfigRequestProperties(request, service, configType)) - .collect(toList()); - ClusterRequest internalRequest = new ClusterRequest(null, request.clusterName(), null, null); - internalRequest.setDesiredConfig(configRequests); - requests.add(internalRequest); - } - - try { - controller.updateClusters(requests, null); - } catch (AmbariException | AuthorizationException e) { - String msg = String.format("Error creating configurations for %s", request); - LOG.error(msg, e); - throw new RuntimeException(msg, e); - } + public void updateExistingConfigs(AddServiceInfo request) { + LOG.info("Updating existing configurations for {}", request); + Set<ClusterRequest> requests = createConfigRequestsForExistingServices(request); + updateCluster(request, requests, "Error updating configurations for %s"); } public void updateServiceDesiredState(AddServiceInfo request, State desiredState) { @@ -149,7 +144,7 @@ public class ResourceProviderAdapter { "context", String.format("Put new components to %s state", desiredState) )); HostComponentResourceProvider rp = (HostComponentResourceProvider) getClusterController().ensureResourceProvider(Resource.Type.HostComponent); - Request internalRequest = createRequest(request, properties, Resource.Type.HostComponent); + Request internalRequest = createRequest(request.clusterName(), properties, Resource.Type.HostComponent); try { rp.doUpdateResources(request.getStages(), internalRequest, predicateForNewServices(request, HostComponentResourceProvider.HOST_ROLES), false, false, false); } catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) { @@ -159,35 +154,44 @@ public class ResourceProviderAdapter { } } - private static void createResources(Set<Map<String, Object>> properties, Resource.Type resourceType) { + private static void createResources(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType) { Request internalRequest = new RequestImpl(null, properties, null, null); ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType); try { rp.createResources(internalRequest); } catch (UnsupportedPropertyException | SystemException | ResourceAlreadyExistsException | NoSuchParentResourceException e) { - String msg = String.format("Error creating resources: %s", resourceType); + String msg = String.format("Error creating resources %s for %s", resourceType, request); LOG.error(msg, e); throw new RuntimeException(msg, e); } } private static void updateResources(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType, Predicate predicate) { - Request internalRequest = createRequest(request, properties, resourceType); + Request internalRequest = createRequest(request.clusterName(), properties, resourceType); ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType); try { rp.updateResources(internalRequest, predicate); } catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) { - String msg = String.format("Error updating resources: %s", resourceType); + String msg = String.format("Error updating resources %s for %s", resourceType, request); LOG.error(msg, e); throw new RuntimeException(msg, e); } } - private static Request createRequest(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType) { - Map<String, String> requestInfoProperties = ImmutableMap.of( + private void updateCluster(AddServiceInfo addServiceRequest, Set<ClusterRequest> requests, String errorMessageFormat) { + try { + controller.updateClusters(requests, null); + } catch (AmbariException | AuthorizationException e) { + String msg = String.format(errorMessageFormat, addServiceRequest); + LOG.error(msg, e); + throw new RuntimeException(msg, e); + } + } + private static Request createRequest(String clusterName, Set<Map<String, Object>> properties, Resource.Type resourceType) { + Map<String, String> requestInfoProperties = ImmutableMap.of( RequestOperationLevel.OPERATION_LEVEL_ID, RequestOperationLevel.getExternalLevelName(resourceType.name()), - RequestOperationLevel.OPERATION_CLUSTER_ID, request.clusterName() + RequestOperationLevel.OPERATION_CLUSTER_ID, clusterName ); return new RequestImpl(null, properties, requestInfoProperties, null); } @@ -225,12 +229,118 @@ public class ResourceProviderAdapter { return properties.build(); } - private static ConfigurationRequest createClusterConfigRequestProperties(AddServiceInfo request, String service, String configType) { - LOG.debug("Creating config type {} for service {}", configType, service); + private static Set<ClusterRequest> createConfigRequestsForNewServices(AddServiceInfo request) { + Map<String, Map<String, String>> fullProperties = request.getConfig().getFullProperties(); + Map<String, Map<String, Map<String, String>>> fullAttributes = request.getConfig().getFullAttributes(); + + return createConfigRequestsForServices( + request.newServices().keySet(), + configType -> !Objects.equals(configType, ConfigHelper.CLUSTER_ENV), + request, fullProperties, fullAttributes + ); + } + + private Set<ClusterRequest> createConfigRequestsForExistingServices(AddServiceInfo request) { + Cluster cluster = getCluster(request.clusterName()); + Map<String, Map<String, String>> desiredConfigTags = getDesiredTags(cluster); + Configuration mergedConfig = new Configuration( + request.getConfig().getProperties(), request.getConfig().getAttributes(), + new Configuration( + configHelper.getEffectiveConfigProperties(cluster, desiredConfigTags), + configHelper.getEffectiveConfigAttributes(cluster, desiredConfigTags) + ) + ); + + Set<String> configTypesInRequest = ImmutableSet.copyOf( + Sets.difference( + Sets.union( + request.getConfig().getProperties().keySet(), + request.getConfig().getAttributes().keySet()), + ImmutableSet.of(ConfigHelper.CLUSTER_ENV)) + ); + + Map<String, Map<String, String>> fullProperties = mergedConfig.getFullProperties(); + Map<String, Map<String, Map<String, String>>> fullAttributes = mergedConfig.getFullAttributes(); + + Set<ClusterRequest> clusterRequests = createConfigRequestsForServices( + cluster.getServices().keySet(), + configTypesInRequest::contains, + request, fullProperties, fullAttributes + ); + + if (request.getConfig().getProperties().containsKey(ConfigHelper.CLUSTER_ENV)) { + Optional<ClusterRequest> clusterEnvRequest = createConfigRequestForConfigTypes(Stream.of(ConfigHelper.CLUSTER_ENV), + request, fullProperties, fullAttributes); + clusterEnvRequest.ifPresent(clusterRequests::add); + } + + return clusterRequests; + } + + private static Set<ClusterRequest> createConfigRequestsForServices( + Set<String> services, + java.util.function.Predicate<String> predicate, + AddServiceInfo request, + Map<String, Map<String, String>> fullProperties, + Map<String, Map<String, Map<String, String>>> fullAttributes + ) { + return services.stream() + .map(service -> createConfigRequestForConfigTypes( + request.getStack().getConfigurationTypes(service).stream() + .filter(predicate), + request, fullProperties, fullAttributes + )) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toSet()); + } + + /** + * Creates a {@link ConfigurationRequest} for each config type in the {@code configTypes} stream. + * + * @return an {@code Optional} {@link ClusterRequest} with desired configs set to all the {@code ConfigurationRequests}, + * or an empty {@code Optional} if the incoming {@code configTypes} stream is empty + */ + private static Optional<ClusterRequest> createConfigRequestForConfigTypes( + Stream<String> configTypes, + AddServiceInfo addServiceRequest, + Map<String, Map<String, String>> fullProperties, + Map<String, Map<String, Map<String, String>>> fullAttributes + ) { + List<ConfigurationRequest> configRequests = configTypes + .peek(configType -> LOG.info("Creating request for config type {} for {}", configType, addServiceRequest)) + .map(configType -> new ConfigurationRequest(addServiceRequest.clusterName(), configType, "ADD_SERVICE", + fullProperties.getOrDefault(configType, ImmutableMap.of()), + fullAttributes.getOrDefault(configType, ImmutableMap.of()))) + .collect(toList()); + + if (configRequests.isEmpty()) { + return Optional.empty(); + } + + ClusterRequest clusterRequest = new ClusterRequest(null, addServiceRequest.clusterName(), null, null); + clusterRequest.setDesiredConfig(configRequests); + return Optional.of(clusterRequest); + } - return new ConfigurationRequest(request.clusterName(), configType, "ADD_SERVICE", - request.getConfig().getProperties().getOrDefault(configType, new HashMap<>(0)), - request.getConfig().getAttributes().getOrDefault(configType, new HashMap<>(0))); + private Map<String, Map<String, String>> getDesiredTags(Cluster cluster) { + try { + return configHelper.getEffectiveDesiredTags(cluster, null); + } catch (AmbariException e) { + String msg = String.format("Error getting tags for desired config of cluster %s", cluster.getClusterName()); + LOG.error(msg); + throw new IllegalStateException(msg, e); + } + } + + private Cluster getCluster(String clusterName) { + try { + return controller.getClusters().getCluster(clusterName); + } catch (AmbariException e) { + String msg = String.format("Cannot find cluster %s", clusterName); + LOG.error(msg); + throw new IllegalStateException(msg, e); + } } private static Predicate predicateForNewServices(AddServiceInfo request, String category) {