Repository: ambari Updated Branches: refs/heads/branch-2.5 eae2a4c5e -> 08a998455
AMBARI-19275. Single API to download all client configs. (jaimin) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/08a99845 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/08a99845 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/08a99845 Branch: refs/heads/branch-2.5 Commit: 08a9984553d26d939b227800cbe2414fbf7d1856 Parents: eae2a4c Author: Jaimin Jetly <jai...@hortonworks.com> Authored: Wed Dec 28 16:09:40 2016 -0800 Committer: Jaimin Jetly <jai...@hortonworks.com> Committed: Wed Dec 28 16:12:20 2016 -0800 ---------------------------------------------------------------------- ambari-server/pom.xml | 5 + .../server/api/services/ComponentService.java | 27 +- .../api/services/HostComponentService.java | 25 +- .../server/configuration/Configuration.java | 19 + .../internal/ClientConfigResourceProvider.java | 850 +++++++++++++------ .../api/services/ComponentServiceTest.java | 4 +- .../api/services/HostComponentServiceTest.java | 4 +- .../ClientConfigResourceProviderTest.java | 36 +- 8 files changed, 684 insertions(+), 286 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/08a99845/ambari-server/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml index c3cdc8b..af9bf7a 100644 --- a/ambari-server/pom.xml +++ b/ambari-server/pom.xml @@ -1025,6 +1025,11 @@ <artifactId>commons-csv</artifactId> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.5</version> + </dependency> + <dependency> <groupId>uk.com.robust-it</groupId> <artifactId>cloning</artifactId> <version>1.9.2</version> http://git-wip-us.apache.org/repos/asf/ambari/blob/08a99845/ambari-server/src/main/java/org/apache/ambari/server/api/services/ComponentService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/ComponentService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/ComponentService.java index ded2596..1725b11 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/ComponentService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/ComponentService.java @@ -41,6 +41,8 @@ import javax.ws.rs.core.UriInfo; import org.apache.ambari.server.api.resources.ResourceInstance; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.spi.Resource; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.Validate; /** * Service responsible for components resource requests. @@ -99,7 +101,12 @@ public class ComponentService extends BaseService { */ @GET @Produces("text/plain") - public Response getComponents(String body, @Context HttpHeaders headers, @Context UriInfo ui) { + public Response getComponents(String body, @Context HttpHeaders headers, @Context UriInfo ui, + @QueryParam("format") String format) { + + if (format != null && format.equals("client_config_tar")) { + return createClientConfigResource(body, headers, ui, null); + } return handleRequest(headers, body, ui, Request.Type.GET, createComponentResource(m_clusterName, m_serviceName, null)); } @@ -227,7 +234,20 @@ public class ComponentService extends BaseService { mapIds.put(Resource.Type.Cluster, m_clusterName); mapIds.put(Resource.Type.Service, m_serviceName); mapIds.put(Resource.Type.Component, componentName); + String filePrefixName; + + if (StringUtils.isEmpty(componentName)) { + if (StringUtils.isEmpty(m_serviceName)) { + filePrefixName = m_clusterName + "(" + Resource.InternalType.Cluster.toString().toUpperCase()+")"; + } else { + filePrefixName = m_serviceName + "(" + Resource.InternalType.Service.toString().toUpperCase()+")"; + } + } else { + filePrefixName = componentName; + } + Validate.notNull(filePrefixName, "compressed config file name should not be null"); + String fileName = filePrefixName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION; Response response = handleRequest(headers, body, ui, Request.Type.GET, createResource(Resource.Type.ClientConfig, mapIds)); @@ -240,7 +260,7 @@ public class ComponentService extends BaseService { Response.ResponseBuilder rb = Response.status(Response.Status.OK); Configuration configs = new Configuration(); String tmpDir = configs.getProperty(Configuration.SERVER_TMP_DIR.getKey()); - File file = new File(tmpDir + File.separator + componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION); + File file = new File(tmpDir,fileName); InputStream resultInputStream = null; try { resultInputStream = new FileInputStream(file); @@ -249,8 +269,7 @@ public class ComponentService extends BaseService { } String contentType = Configuration.DEF_ARCHIVE_CONTENT_TYPE; - String outputFileName = componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION; - rb.header("Content-Disposition", "attachment; filename=\"" + outputFileName + "\""); + rb.header("Content-Disposition", "attachment; filename=\"" + fileName + "\""); rb.entity(resultInputStream); return rb.type(contentType).build(); http://git-wip-us.apache.org/repos/asf/ambari/blob/08a99845/ambari-server/src/main/java/org/apache/ambari/server/api/services/HostComponentService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/HostComponentService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/HostComponentService.java index bc9bb30..cdd1761 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/HostComponentService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/HostComponentService.java @@ -41,6 +41,8 @@ import javax.ws.rs.core.UriInfo; import org.apache.ambari.server.api.resources.ResourceInstance; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.spi.Resource; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.Validate; /** * Service responsible for host_components resource requests. @@ -107,7 +109,10 @@ public class HostComponentService extends BaseService { */ @GET @Produces("text/plain") - public Response getHostComponents(String body, @Context HttpHeaders headers, @Context UriInfo ui) { + public Response getHostComponents(String body, @Context HttpHeaders headers, @Context UriInfo ui, @QueryParam("format") String format) { + if (format != null && format.equals("client_config_tar")) { + return createClientConfigResource(body, headers, ui, null); + } return handleRequest(headers, body, ui, Request.Type.GET, createHostComponentResource(m_clusterName, m_hostName, null)); } @@ -277,10 +282,21 @@ public class HostComponentService extends BaseService { return response; } + String filePrefixName; + + if (StringUtils.isEmpty(hostComponentName)) { + filePrefixName = m_hostName + "(" + Resource.InternalType.Host.toString().toUpperCase()+")"; + } else { + filePrefixName = hostComponentName; + } + + Validate.notNull(filePrefixName, "compressed config file name should not be null"); + String fileName = filePrefixName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION; + Response.ResponseBuilder rb = Response.status(Response.Status.OK); Configuration configs = new Configuration(); String tmpDir = configs.getProperty(Configuration.SERVER_TMP_DIR.getKey()); - File file = new File(tmpDir+File.separator+hostComponentName+"-configs.tar.gz"); + File file = new File(tmpDir,fileName); InputStream resultInputStream = null; try { resultInputStream = new FileInputStream(file); @@ -288,9 +304,8 @@ public class HostComponentService extends BaseService { e.printStackTrace(); } - String contentType = "application/x-ustar"; - String outputFileName = hostComponentName + "-configs.tar.gz"; - rb.header("Content-Disposition", "attachment; filename=\"" + outputFileName + "\""); + String contentType = Configuration.DEF_ARCHIVE_CONTENT_TYPE; + rb.header("Content-Disposition", "attachment; filename=\"" + fileName + "\""); rb.entity(resultInputStream); return rb.type(contentType).build(); http://git-wip-us.apache.org/repos/asf/ambari/blob/08a99845/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index 26c1402..9459d93 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -1737,6 +1737,14 @@ public class Configuration { public static final ConfigurationProperty<Integer> EXTERNAL_SCRIPT_TIMEOUT = new ConfigurationProperty<>( "server.script.timeout", 5000); + /** + * The time, in {@link TimeUnit#MILLISECONDS}, until an external script is killed. + * n threads will execute n/2 scripts. one extra thread is needed to gather error/output stream of external script + */ + @Markdown(description = "The number of threads that should be allocated to run external script.") + public static final ConfigurationProperty<Integer> THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT = new ConfigurationProperty<>( + "server.script.threads", 4); + public static final String DEF_ARCHIVE_EXTENSION; public static final String DEF_ARCHIVE_CONTENT_TYPE; @@ -2823,6 +2831,7 @@ public class Configuration { configsMap.put(SERVER_TMP_DIR.getKey(), getProperty(SERVER_TMP_DIR)); configsMap.put(LOG4JMONITOR_DELAY.getKey(), getProperty(LOG4JMONITOR_DELAY)); configsMap.put(EXTERNAL_SCRIPT_TIMEOUT.getKey(), getProperty(EXTERNAL_SCRIPT_TIMEOUT)); + configsMap.put(THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT.getKey(), getProperty(THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT)); configsMap.put(SHARED_RESOURCES_DIR.getKey(), getProperty(SHARED_RESOURCES_DIR)); configsMap.put(KDC_PORT.getKey(), getProperty(KDC_PORT)); configsMap.put(AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT.getKey(), getProperty(AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT)); @@ -4228,6 +4237,16 @@ public class Configuration { return Integer.parseInt(getProperty(EXTERNAL_SCRIPT_TIMEOUT)); } + //THREAD_POOL_FOR_EXTERNAL_SCRIPT + + /** + * Get the threadpool size for external script execution + * @return {Integer} + */ + public Integer getExternalScriptThreadPoolSize() { + return Integer.parseInt(getProperty(THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT)); + } + public boolean getParallelStageExecution() { return Boolean.parseBoolean(configsMap.get(PARALLEL_STAGE_EXECUTION.getKey())); } http://git-wip-us.apache.org/repos/asf/ambari/blob/08a99845/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java index 020a454..8a35c98 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.ambari.server.controller.internal; import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.AGENT_STACK_RETRY_COUNT; @@ -36,9 +37,13 @@ import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.STACK_NAM import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.STACK_VERSION; import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.USER_LIST; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -53,6 +58,9 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.ambari.server.AmbariException; @@ -85,6 +93,13 @@ import org.apache.ambari.server.state.ServiceOsSpecific; import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.utils.SecretReference; import org.apache.ambari.server.utils.StageUtils; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +121,6 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv protected static final String COMPONENT_COMPONENT_NAME_PROPERTY_ID = "ServiceComponentInfo/component_name"; protected static final String HOST_COMPONENT_HOST_NAME_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "host_name"); - private static final int SCRIPT_TIMEOUT = 1500; private final Gson gson; @@ -173,75 +187,116 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv throw new SystemException("Failed to get components ", e); } + Map<String,ServiceComponentHostResponse> componentMap = new HashMap<>(); + + // reduce set of sch responses to one sch response for every service component + for (ServiceComponentHostResponse resp: responses) { + String componentName = resp.getComponentName(); + if (!componentMap.containsKey(componentName)) { + componentMap.put(resp.getComponentName(),resp); + } + } + + ServiceComponentHostRequest schRequest = requests.iterator().next(); + String requestComponentName = schRequest.getComponentName(); + String requestServiceName = schRequest.getServiceName(); + String requestHostName = schRequest.getHostname(); + + Map<String,List<ServiceComponentHostResponse>> serviceToComponentMap = new HashMap<String,List<ServiceComponentHostResponse>>(); + + // sch response for the service components that have configFiles defined in the stack definition of the service + List <ServiceComponentHostResponse> schWithConfigFiles = new ArrayList<>(); + Configuration configs = new Configuration(); Map<String, String> configMap = configs.getConfigsMap(); String TMP_PATH = configMap.get(Configuration.SERVER_TMP_DIR.getKey()); String pythonCmd = configMap.get(Configuration.AMBARI_PYTHON_WRAP.getKey()); - AmbariManagementController managementController = getManagementController(); - ConfigHelper configHelper = managementController.getConfigHelper(); - Cluster cluster = null; - Clusters clusters = managementController.getClusters(); - try { - cluster = clusters.getCluster(responses.iterator().next().getClusterName()); + List<String> pythonCompressFilesCmds = new ArrayList<>(); - StackId stackId = cluster.getCurrentStackVersion(); - String serviceName = responses.iterator().next().getServiceName(); - String componentName = responses.iterator().next().getComponentName(); - String hostName = responses.iterator().next().getHostname(); - ComponentInfo componentInfo = null; - String packageFolder = null; + for (ServiceComponentHostResponse response : componentMap.values()){ - componentInfo = managementController.getAmbariMetaInfo(). - getComponent(stackId.getStackName(), stackId.getStackVersion(), serviceName, componentName); - packageFolder = managementController.getAmbariMetaInfo(). - getService(stackId.getStackName(), stackId.getStackVersion(), serviceName).getServicePackageFolder(); + AmbariManagementController managementController = getManagementController(); + ConfigHelper configHelper = managementController.getConfigHelper(); + Cluster cluster = null; + Clusters clusters = managementController.getClusters(); + try { + cluster = clusters.getCluster(response.getClusterName()); + + StackId stackId = cluster.getCurrentStackVersion(); + String serviceName = response.getServiceName(); + String componentName = response.getComponentName(); + String hostName = response.getHostname(); + ComponentInfo componentInfo = null; + String packageFolder = null; + + componentInfo = managementController.getAmbariMetaInfo(). + getComponent(stackId.getStackName(), stackId.getStackVersion(), serviceName, componentName); + packageFolder = managementController.getAmbariMetaInfo(). + getService(stackId.getStackName(), stackId.getStackVersion(), serviceName).getServicePackageFolder(); + + String commandScript = componentInfo.getCommandScript().getScript(); + List<ClientConfigFileDefinition> clientConfigFiles = componentInfo.getClientConfigFiles(); + + if (clientConfigFiles == null) { + if (componentMap.size() == 1) { + throw new SystemException("No configuration files defined for the component " + componentInfo.getName()); + } else { + LOG.debug(String.format("No configuration files defined for the component %s",componentInfo.getName())); + continue; + } + } - String commandScript = componentInfo.getCommandScript().getScript(); - List<ClientConfigFileDefinition> clientConfigFiles = componentInfo.getClientConfigFiles(); + // service component hosts that have configFiles defined in the stack definition of the service + schWithConfigFiles.add(response); - if (clientConfigFiles == null) { - throw new SystemException("No configuration files defined for the component " + componentInfo.getName()); - } + if (serviceToComponentMap.containsKey(response.getServiceName())) { + List <ServiceComponentHostResponse> schResponseList = serviceToComponentMap.get(serviceName); + schResponseList.add(response); + } else { + List <ServiceComponentHostResponse> schResponseList = new ArrayList<>(); + schResponseList.add(response); + serviceToComponentMap.put(serviceName,schResponseList); + } - String resourceDirPath = configs.getResourceDirPath(); - String packageFolderAbsolute = resourceDirPath + File.separator + packageFolder; + String resourceDirPath = configs.getResourceDirPath(); + String packageFolderAbsolute = resourceDirPath + File.separator + packageFolder; - String commandScriptAbsolute = packageFolderAbsolute + File.separator + commandScript; + String commandScriptAbsolute = packageFolderAbsolute + File.separator + commandScript; - Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>(); - Map<String, Long> configVersions = new TreeMap<String, Long>(); - Map<String, Map<PropertyType, Set<String>>> configPropertiesTypes = new TreeMap<>(); - Map<String, Map<String, Map<String, String>>> configurationAttributes = new TreeMap<String, Map<String, Map<String, String>>>(); + Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>(); + Map<String, Long> configVersions = new TreeMap<String, Long>(); + Map<String, Map<PropertyType, Set<String>>> configPropertiesTypes = new TreeMap<>(); + Map<String, Map<String, Map<String, String>>> configurationAttributes = new TreeMap<String, Map<String, Map<String, String>>>(); - Map<String, DesiredConfig> desiredClusterConfigs = cluster.getDesiredConfigs(); + Map<String, DesiredConfig> desiredClusterConfigs = cluster.getDesiredConfigs(); - //Get configurations and configuration attributes - for (Map.Entry<String, DesiredConfig> desiredConfigEntry : desiredClusterConfigs.entrySet()) { + //Get configurations and configuration attributes + for (Map.Entry<String, DesiredConfig> desiredConfigEntry : desiredClusterConfigs.entrySet()) { - String configType = desiredConfigEntry.getKey(); - DesiredConfig desiredConfig = desiredConfigEntry.getValue(); - Config clusterConfig = cluster.getConfig(configType, desiredConfig.getTag()); + String configType = desiredConfigEntry.getKey(); + DesiredConfig desiredConfig = desiredConfigEntry.getValue(); + Config clusterConfig = cluster.getConfig(configType, desiredConfig.getTag()); - if (clusterConfig != null) { - Map<String, String> props = new HashMap<String, String>(clusterConfig.getProperties()); + if (clusterConfig != null) { + Map<String, String> props = new HashMap<String, String>(clusterConfig.getProperties()); - // Apply global properties for this host from all config groups - Map<String, Map<String, String>> allConfigTags = null; - allConfigTags = configHelper - .getEffectiveDesiredTags(cluster, hostName); + // Apply global properties for this host from all config groups + Map<String, Map<String, String>> allConfigTags = null; + allConfigTags = configHelper + .getEffectiveDesiredTags(cluster, schRequest.getHostname()); - Map<String, Map<String, String>> configTags = new HashMap<String, - Map<String, String>>(); + Map<String, Map<String, String>> configTags = new HashMap<String, + Map<String, String>>(); - for (Map.Entry<String, Map<String, String>> entry : allConfigTags.entrySet()) { - if (entry.getKey().equals(clusterConfig.getType())) { - configTags.put(clusterConfig.getType(), entry.getValue()); + for (Map.Entry<String, Map<String, String>> entry : allConfigTags.entrySet()) { + if (entry.getKey().equals(clusterConfig.getType())) { + configTags.put(clusterConfig.getType(), entry.getValue()); + } } - } - Map<String, Map<String, String>> properties = configHelper - .getEffectiveConfigProperties(cluster, configTags); + Map<String, Map<String, String>> properties = configHelper + .getEffectiveConfigProperties(cluster, configTags); if (!properties.isEmpty()) { for (Map<String, String> propertyMap : properties.values()) { @@ -249,178 +304,198 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv } } - configurations.put(clusterConfig.getType(), props); - configVersions.put(clusterConfig.getType(), clusterConfig.getVersion()); - configPropertiesTypes.put(clusterConfig.getType(), clusterConfig.getPropertiesTypes()); + configurations.put(clusterConfig.getType(), props); + configVersions.put(clusterConfig.getType(), clusterConfig.getVersion()); + configPropertiesTypes.put(clusterConfig.getType(), clusterConfig.getPropertiesTypes()); - Map<String, Map<String, String>> attrs = new TreeMap<String, Map<String, String>>(); - configHelper.cloneAttributesMap(clusterConfig.getPropertiesAttributes(), attrs); + Map<String, Map<String, String>> attrs = new TreeMap<String, Map<String, String>>(); + configHelper.cloneAttributesMap(clusterConfig.getPropertiesAttributes(), attrs); - Map<String, Map<String, Map<String, String>>> attributes = configHelper - .getEffectiveConfigAttributes(cluster, configTags); - for (Map<String, Map<String, String>> attributesMap : attributes.values()) { - configHelper.cloneAttributesMap(attributesMap, attrs); + Map<String, Map<String, Map<String, String>>> attributes = configHelper + .getEffectiveConfigAttributes(cluster, configTags); + for (Map<String, Map<String, String>> attributesMap : attributes.values()) { + configHelper.cloneAttributesMap(attributesMap, attrs); + } + configurationAttributes.put(clusterConfig.getType(), attrs); } - configurationAttributes.put(clusterConfig.getType(), attrs); } - } - ConfigHelper.processHiddenAttribute(configurations, configurationAttributes, componentName, true); + ConfigHelper.processHiddenAttribute(configurations, configurationAttributes, componentName, true); - for(Map.Entry<String, Map<String, Map<String, String>>> configurationAttributesEntry : configurationAttributes.entrySet()){ - Map<String, Map<String, String>> attrs = configurationAttributesEntry.getValue(); - // remove internal attributes like "hidden" - attrs.remove("hidden"); - } + for (Map.Entry<String, Map<String, Map<String, String>>> configurationAttributesEntry : configurationAttributes.entrySet()) { + Map<String, Map<String, String>> attrs = configurationAttributesEntry.getValue(); + // remove internal attributes like "hidden" + attrs.remove("hidden"); + } - // replace passwords on password references - for(Map.Entry<String, Map<String, String>> configEntry: configurations.entrySet()) { - String configType = configEntry.getKey(); - Map<String, String> configProperties = configEntry.getValue(); - Long configVersion = configVersions.get(configType); - Map<PropertyType, Set<String>> propertiesTypes = configPropertiesTypes.get(configType); - SecretReference.replacePasswordsWithReferences(propertiesTypes, configProperties, configType, configVersion); - } + // replace passwords on password references + for (Map.Entry<String, Map<String, String>> configEntry : configurations.entrySet()) { + String configType = configEntry.getKey(); + Map<String, String> configProperties = configEntry.getValue(); + Long configVersion = configVersions.get(configType); + Map<PropertyType, Set<String>> propertiesTypes = configPropertiesTypes.get(configType); + SecretReference.replacePasswordsWithReferences(propertiesTypes, configProperties, configType, configVersion); + } - Map<String, Set<String>> clusterHostInfo = null; - ServiceInfo serviceInfo = null; - String osFamily = null; - clusterHostInfo = StageUtils.getClusterHostInfo(cluster); - serviceInfo = managementController.getAmbariMetaInfo().getService(stackId.getStackName(), - stackId.getStackVersion(), serviceName); - try { - clusterHostInfo = StageUtils.substituteHostIndexes(clusterHostInfo); - } catch (AmbariException e) { - // Before moving substituteHostIndexes to StageUtils, a SystemException was thrown in the - // event an index could not be mapped to a host. After the move, this was changed to an - // AmbariException for consistency in the StageUtils class. To keep this method consistent - // with how it behaved in the past, if an AmbariException is thrown, it is caught and - // translated to a SystemException. - throw new SystemException(e.getMessage(), e); - } - osFamily = clusters.getHost(hostName).getOsFamily(); - - TreeMap<String, String> hostLevelParams = new TreeMap<String, String>(); - hostLevelParams.put(JDK_LOCATION, managementController.getJdkResourceUrl()); - hostLevelParams.put(JAVA_HOME, managementController.getJavaHome()); - hostLevelParams.put(JAVA_VERSION, String.valueOf(configs.getJavaVersion())); - hostLevelParams.put(JDK_NAME, managementController.getJDKName()); - hostLevelParams.put(JCE_NAME, managementController.getJCEName()); - hostLevelParams.put(STACK_NAME, stackId.getStackName()); - hostLevelParams.put(STACK_VERSION, stackId.getStackVersion()); - hostLevelParams.put(DB_NAME, managementController.getServerDB()); - hostLevelParams.put(MYSQL_JDBC_URL, managementController.getMysqljdbcUrl()); - hostLevelParams.put(ORACLE_JDBC_URL, managementController.getOjdbcUrl()); - hostLevelParams.put(HOST_SYS_PREPPED, configs.areHostsSysPrepped()); - hostLevelParams.putAll(managementController.getRcaParameters()); - hostLevelParams.put(AGENT_STACK_RETRY_ON_UNAVAILABILITY, configs.isAgentStackRetryOnInstallEnabled()); - hostLevelParams.put(AGENT_STACK_RETRY_COUNT, configs.getAgentStackRetryOnInstallCount()); - - // Write down os specific info for the service - ServiceOsSpecific anyOs = null; - if (serviceInfo.getOsSpecifics().containsKey(AmbariMetaInfo.ANY_OS)) { - anyOs = serviceInfo.getOsSpecifics().get(AmbariMetaInfo.ANY_OS); - } + Map<String, Set<String>> clusterHostInfo = null; + ServiceInfo serviceInfo = null; + String osFamily = null; + clusterHostInfo = StageUtils.getClusterHostInfo(cluster); + serviceInfo = managementController.getAmbariMetaInfo().getService(stackId.getStackName(), + stackId.getStackVersion(), serviceName); + try { + clusterHostInfo = StageUtils.substituteHostIndexes(clusterHostInfo); + } catch (AmbariException e) { + // Before moving substituteHostIndexes to StageUtils, a SystemException was thrown in the + // event an index could not be mapped to a host. After the move, this was changed to an + // AmbariException for consistency in the StageUtils class. To keep this method consistent + // with how it behaved in the past, if an AmbariException is thrown, it is caught and + // translated to a SystemException. + throw new SystemException(e.getMessage(), e); + } + osFamily = clusters.getHost(hostName).getOsFamily(); + + TreeMap<String, String> hostLevelParams = new TreeMap<String, String>(); + hostLevelParams.put(JDK_LOCATION, managementController.getJdkResourceUrl()); + hostLevelParams.put(JAVA_HOME, managementController.getJavaHome()); + hostLevelParams.put(JAVA_VERSION, String.valueOf(configs.getJavaVersion())); + hostLevelParams.put(JDK_NAME, managementController.getJDKName()); + hostLevelParams.put(JCE_NAME, managementController.getJCEName()); + hostLevelParams.put(STACK_NAME, stackId.getStackName()); + hostLevelParams.put(STACK_VERSION, stackId.getStackVersion()); + hostLevelParams.put(DB_NAME, managementController.getServerDB()); + hostLevelParams.put(MYSQL_JDBC_URL, managementController.getMysqljdbcUrl()); + hostLevelParams.put(ORACLE_JDBC_URL, managementController.getOjdbcUrl()); + hostLevelParams.put(HOST_SYS_PREPPED, configs.areHostsSysPrepped()); + hostLevelParams.putAll(managementController.getRcaParameters()); + hostLevelParams.put(AGENT_STACK_RETRY_ON_UNAVAILABILITY, configs.isAgentStackRetryOnInstallEnabled()); + hostLevelParams.put(AGENT_STACK_RETRY_COUNT, configs.getAgentStackRetryOnInstallCount()); + + // Write down os specific info for the service + ServiceOsSpecific anyOs = null; + if (serviceInfo.getOsSpecifics().containsKey(AmbariMetaInfo.ANY_OS)) { + anyOs = serviceInfo.getOsSpecifics().get(AmbariMetaInfo.ANY_OS); + } - ServiceOsSpecific hostOs = populateServicePackagesInfo(serviceInfo, hostLevelParams, osFamily); + ServiceOsSpecific hostOs = populateServicePackagesInfo(serviceInfo, hostLevelParams, osFamily); - // Build package list that is relevant for host - List<ServiceOsSpecific.Package> packages = - new ArrayList<ServiceOsSpecific.Package>(); - if (anyOs != null) { - packages.addAll(anyOs.getPackages()); - } + // Build package list that is relevant for host + List<ServiceOsSpecific.Package> packages = + new ArrayList<ServiceOsSpecific.Package>(); + if (anyOs != null) { + packages.addAll(anyOs.getPackages()); + } - if (hostOs != null) { - packages.addAll(hostOs.getPackages()); - } - String packageList = gson.toJson(packages); - hostLevelParams.put(PACKAGE_LIST, packageList); - - Set<String> userSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.USER, cluster, desiredClusterConfigs); - String userList = gson.toJson(userSet); - hostLevelParams.put(USER_LIST, userList); - - Set<String> groupSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.GROUP, cluster, desiredClusterConfigs); - String groupList = gson.toJson(groupSet); - hostLevelParams.put(GROUP_LIST, groupList); - - Set<String> notManagedHdfsPathSet = configHelper.getPropertyValuesWithPropertyType(stackId,PropertyType.NOT_MANAGED_HDFS_PATH, cluster, desiredClusterConfigs); - String notManagedHdfsPathList = gson.toJson(notManagedHdfsPathSet); - hostLevelParams.put(NOT_MANAGED_HDFS_PATH_LIST, notManagedHdfsPathList); - - String jsonConfigurations = null; - Map<String, Object> commandParams = new HashMap<String, Object>(); - List<Map<String, String>> xmlConfigs = new LinkedList<Map<String, String>>(); - List<Map<String, String>> envConfigs = new LinkedList<Map<String, String>>(); - List<Map<String, String>> propertiesConfigs = new LinkedList<Map<String, String>>(); - - //Fill file-dictionary configs from metainfo - for (ClientConfigFileDefinition clientConfigFile : clientConfigFiles) { - Map<String, String> fileDict = new HashMap<String, String>(); - fileDict.put(clientConfigFile.getFileName(), clientConfigFile.getDictionaryName()); - if (clientConfigFile.getType().equals("xml")) { - xmlConfigs.add(fileDict); - } else if (clientConfigFile.getType().equals("env")) { - envConfigs.add(fileDict); - } else if (clientConfigFile.getType().equals("properties")) { - propertiesConfigs.add(fileDict); + if (hostOs != null) { + packages.addAll(hostOs.getPackages()); + } + String packageList = gson.toJson(packages); + hostLevelParams.put(PACKAGE_LIST, packageList); + + Set<String> userSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.USER, cluster, desiredClusterConfigs); + String userList = gson.toJson(userSet); + hostLevelParams.put(USER_LIST, userList); + + Set<String> groupSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.GROUP, cluster, desiredClusterConfigs); + String groupList = gson.toJson(groupSet); + hostLevelParams.put(GROUP_LIST, groupList); + + Set<String> notManagedHdfsPathSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.NOT_MANAGED_HDFS_PATH, cluster, desiredClusterConfigs); + String notManagedHdfsPathList = gson.toJson(notManagedHdfsPathSet); + hostLevelParams.put(NOT_MANAGED_HDFS_PATH_LIST, notManagedHdfsPathList); + + String jsonConfigurations = null; + Map<String, Object> commandParams = new HashMap<String, Object>(); + List<Map<String, String>> xmlConfigs = new LinkedList<Map<String, String>>(); + List<Map<String, String>> envConfigs = new LinkedList<Map<String, String>>(); + List<Map<String, String>> propertiesConfigs = new LinkedList<Map<String, String>>(); + + //Fill file-dictionary configs from metainfo + for (ClientConfigFileDefinition clientConfigFile : clientConfigFiles) { + Map<String, String> fileDict = new HashMap<String, String>(); + fileDict.put(clientConfigFile.getFileName(), clientConfigFile.getDictionaryName()); + if (clientConfigFile.getType().equals("xml")) { + xmlConfigs.add(fileDict); + } else if (clientConfigFile.getType().equals("env")) { + envConfigs.add(fileDict); + } else if (clientConfigFile.getType().equals("properties")) { + propertiesConfigs.add(fileDict); + } } - } - commandParams.put("xml_configs_list", xmlConfigs); - commandParams.put("env_configs_list", envConfigs); - commandParams.put("properties_configs_list", propertiesConfigs); - commandParams.put("output_file", componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION); - - Map<String, Object> jsonContent = new TreeMap<String, Object>(); - jsonContent.put("configurations", configurations); - jsonContent.put("configuration_attributes", configurationAttributes); - jsonContent.put("commandParams", commandParams); - jsonContent.put("clusterHostInfo", clusterHostInfo); - jsonContent.put("hostLevelParams", hostLevelParams); - jsonContent.put("hostname", hostName); - jsonContent.put("clusterName", cluster.getClusterName()); - jsonConfigurations = gson.toJson(jsonContent); - - File jsonFileName = new File(TMP_PATH + File.separator + componentName + "-configuration.json"); - File tmpDirectory = new File(jsonFileName.getParent()); - if (!tmpDirectory.exists()) { + commandParams.put("xml_configs_list", xmlConfigs); + commandParams.put("env_configs_list", envConfigs); + commandParams.put("properties_configs_list", propertiesConfigs); + commandParams.put("output_file", componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION); + + Map<String, Object> jsonContent = new TreeMap<String, Object>(); + jsonContent.put("configurations", configurations); + jsonContent.put("configuration_attributes", configurationAttributes); + jsonContent.put("commandParams", commandParams); + jsonContent.put("clusterHostInfo", clusterHostInfo); + jsonContent.put("hostLevelParams", hostLevelParams); + jsonContent.put("hostname", hostName); + jsonContent.put("clusterName", cluster.getClusterName()); + jsonConfigurations = gson.toJson(jsonContent); + + File jsonFileName = new File(TMP_PATH + File.separator + componentName + "-configuration.json"); + File tmpDirectory = new File(jsonFileName.getParent()); + if (!tmpDirectory.exists()) { + try { + tmpDirectory.mkdirs(); + tmpDirectory.setWritable(true, true); + tmpDirectory.setReadable(true, true); + } catch (SecurityException se) { + throw new SystemException("Failed to get temporary directory to store configurations", se); + } + } + PrintWriter printWriter = null; try { - tmpDirectory.mkdirs(); - tmpDirectory.setWritable(true, true); - tmpDirectory.setReadable(true, true); - } catch (SecurityException se) { - throw new SystemException("Failed to get temporary directory to store configurations", se); + printWriter = new PrintWriter(jsonFileName.getAbsolutePath()); + printWriter.print(jsonConfigurations); + printWriter.close(); + } catch (FileNotFoundException e) { + throw new SystemException("Failed to write configurations to json file ", e); } - } - PrintWriter printWriter = null; - try { - printWriter = new PrintWriter(jsonFileName.getAbsolutePath()); - printWriter.print(jsonConfigurations); - printWriter.close(); - } catch (FileNotFoundException e) { - throw new SystemException("Failed to write configurations to json file ", e); - } - String cmd = pythonCmd + " " + commandScriptAbsolute + " generate_configs " + jsonFileName.getAbsolutePath() + " " + - packageFolderAbsolute + " " + TMP_PATH + File.separator + "structured-out.json" + " INFO " + TMP_PATH; + String cmd = pythonCmd + " " + commandScriptAbsolute + " generate_configs " + jsonFileName.getAbsolutePath() + " " + + packageFolderAbsolute + " " + TMP_PATH + File.separator + "structured-out.json" + " INFO " + TMP_PATH; - try { - executeCommand(cmd, configs.getExternalScriptTimeout()); - } catch (TimeoutException e) { - LOG.error("Generate client configs script was killed due to timeout ", e); - throw new SystemException("Generate client configs script was killed due to timeout ", e); - } catch (InterruptedException | IOException e) { - LOG.error("Failed to run generate client configs script for a component " + componentName, e); - throw new SystemException("Failed to run generate client configs script for a component " + componentName, e); - } catch (ExecutionException e) { - LOG.error(e.getMessage(),e); - throw new SystemException(e.getMessage() + " " + e.getCause()); + pythonCompressFilesCmds.add(cmd); + + } catch (AmbariException e) { + throw new SystemException("Controller error ", e); } + } + + if (schWithConfigFiles.isEmpty()) { + throw new SystemException("No configuration files defined for any component" ); + } - } catch (AmbariException e) { - throw new SystemException("Controller error ", e); + Integer threadPoolSize = configs.getExternalScriptThreadPoolSize(); + ExecutorService processExecutor = Executors.newFixedThreadPool(threadPoolSize); + + // put all threads that starts process to compress each component config files in the executor + List<CommandLineThreadWrapper> pythonCmdThreads = executeCommands(processExecutor, pythonCompressFilesCmds); + + // wait for all threads to finish + Integer timeout = configs.getExternalScriptTimeout(); + waitForAllThreadsToJoin(processExecutor, pythonCmdThreads, timeout); + + if (StringUtils.isEmpty(requestComponentName)) { + TarUtils tarUtils; + String fileName; + List <ServiceComponentHostResponse> schToTarConfigFiles = schWithConfigFiles; + if (StringUtils.isNotEmpty(requestHostName)) { + fileName = requestHostName + "(" + Resource.InternalType.Host.toString().toUpperCase()+")"; + } else if (StringUtils.isNotEmpty(requestServiceName)) { + fileName = requestServiceName + "(" + Resource.InternalType.Service.toString().toUpperCase()+")"; + schToTarConfigFiles = serviceToComponentMap.get(requestServiceName); + } else { + fileName = schRequest.getClusterName() + "(" + Resource.InternalType.Cluster.toString().toUpperCase()+")"; + } + tarUtils = new TarUtils(TMP_PATH, fileName, schToTarConfigFiles); + tarUtils.tarConfigFiles(); } Resource resource = new ResourceImpl(Resource.Type.ClientConfig); @@ -428,89 +503,214 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv return resources; } - @Override - public RequestStatus updateResources(final Request request, Predicate predicate) - throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException { - - throw new SystemException("The request is not supported"); - } - - @Override - public RequestStatus deleteResources(Request request, Predicate predicate) - throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException { - - throw new SystemException("The request is not supported"); - } - - - // ----- AbstractResourceProvider ------------------------------------------ - - @Override - protected Set<String> getPKPropertyIds() { - return pkPropertyIds; - } - - - // ----- utility methods --------------------------------------------------- - /** - * Get a component request object from a map of property values. - * - * @param properties the predicate - * @return the component request object + * Execute all external script commands + * @param processExecutor {@link ExecutorService} executes the process when threads are available in the pool + * @param commandLines List of {String} commands that starts the python process to compress config files + * @return {@link CommandLineThreadWrapper} + * @throws SystemException */ - - private ServiceComponentHostRequest getRequest(Map<String, Object> properties) { - return new ServiceComponentHostRequest( - (String) properties.get(COMPONENT_CLUSTER_NAME_PROPERTY_ID), - (String) properties.get(COMPONENT_SERVICE_NAME_PROPERTY_ID), - (String) properties.get(COMPONENT_COMPONENT_NAME_PROPERTY_ID), - (String) properties.get(HOST_COMPONENT_HOST_NAME_PROPERTY_ID), - null); + private List<CommandLineThreadWrapper> executeCommands(final ExecutorService processExecutor, List<String> commandLines) + throws SystemException { + List <CommandLineThreadWrapper> commandLineThreadWrappers = new ArrayList<>(); + try { + for (String commandLine : commandLines) { + CommandLineThreadWrapper commandLineThreadWrapper = executeCommand(processExecutor,commandLine); + commandLineThreadWrappers.add(commandLineThreadWrapper); + } + } catch (IOException e) { + LOG.error("Failed to run generate client configs script for components"); + processExecutor.shutdownNow(); + throw new SystemException("Failed to run generate client configs script for components"); + } + return commandLineThreadWrappers; } - private int executeCommand(final String commandLine, - final long timeout) - throws IOException, InterruptedException, TimeoutException, ExecutionException { + /** + * Execute external script command + * @param processExecutor {@link ExecutorService} executes the process when threads are available in the pool + * @param commandLine {String} command that starts the python process to compress config files + * @return {@link CommandLineThreadWrapper} + * @throws IOException + */ + private CommandLineThreadWrapper executeCommand(final ExecutorService processExecutor, final String commandLine) + throws IOException { ProcessBuilder builder = new ProcessBuilder(Arrays.asList(commandLine.split("\\s+"))); builder.redirectErrorStream(true); Process process = builder.start(); CommandLineThread commandLineThread = new CommandLineThread(process); LogStreamReader logStream = new LogStreamReader(process.getInputStream()); Thread logStreamThread = new Thread(logStream, "LogStreamReader"); - logStreamThread.start(); - commandLineThread.start(); + // log collecting thread should be always put first in the executor + processExecutor.execute(logStreamThread); + processExecutor.execute(commandLineThread); + return new CommandLineThreadWrapper(commandLine, commandLineThread, + logStreamThread, logStream, process); + } + + + /** + * Waits for all threads to join that have started python process to tar config files for component + * @param processExecutor {@link ExecutorService} executes the process when threads are available in the pool + * @param pythonCmdThreads list of {@link CommandLineThreadWrapper} + * @param timeout {Integer} time to wait for the threads to join + * @throws SystemException + */ + private void waitForAllThreadsToJoin(ExecutorService processExecutor, List <CommandLineThreadWrapper> pythonCmdThreads, Integer timeout) + throws SystemException { + processExecutor.shutdown(); try { - commandLineThread.join(timeout); - logStreamThread.join(timeout); - Integer returnCode = commandLineThread.getReturnCode(); - if (returnCode == null) { - throw new TimeoutException(); - } else if (returnCode != 0) { - throw new ExecutionException(String.format("Execution of \"%s\" returned %d.", commandLine, returnCode), - new Throwable(logStream.getOutput())); - } else { - return commandLineThread.returnCode; + if (!processExecutor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { + processExecutor.shutdownNow(); + for (CommandLineThreadWrapper commandLineThreadWrapper: pythonCmdThreads) { + CommandLineThread commandLineThread = commandLineThreadWrapper.getCommandLineThread(); + try { + Integer returnCode = commandLineThread.getReturnCode(); + if (returnCode == null) { + throw new TimeoutException(); + } else if (returnCode != 0) { + throw new ExecutionException(String.format("Execution of \"%s\" returned %d.", commandLineThreadWrapper.getCommandLine(), returnCode), + new Throwable(commandLineThreadWrapper.getLogStream().getOutput())); + } + } catch (TimeoutException e) { + LOG.error("Generate client configs script was killed due to timeout ", e); + throw new SystemException("Generate client configs script was killed due to timeout ", e); + } catch (ExecutionException e) { + LOG.error(e.getMessage(), e); + throw new SystemException(e.getMessage() + " " + e.getCause()); + } finally { + commandLineThreadWrapper.getProcess().destroy(); + } + } } - } catch (InterruptedException ex) { - commandLineThread.interrupt(); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw ex; - } finally { - process.destroy(); + processExecutor.shutdownNow(); + LOG.error("Failed to run generate client configs script for components"); + throw new SystemException("Failed to run generate client configs script for components"); } } + + /** + * wrapper class that holds all information and references to the thread and python process + * started to create compressed configuration config files + */ + private static class CommandLineThreadWrapper { + + private String commandLine; + + private CommandLineThread commandLineThread; + + private Thread logStreamThread; + + private LogStreamReader logStream; + + private Process process; + + private CommandLineThreadWrapper(String commandLine, CommandLineThread commandLineThread, + Thread logStreamThread, LogStreamReader logStream, Process process) { + this.commandLine = commandLine; + this.commandLineThread = commandLineThread; + this.logStreamThread = logStreamThread; + this.logStream = logStream; + this.process = process; + } + + /** + * Returns commandLine that starts pyton process + * @return {@link #commandLine} + */ + private String getCommandLine() { + return commandLine; + } + + /** + * Sets {@link #commandLine} + * @param commandLine {String} + */ + private void setCommandLine(String commandLine) { + this.commandLine = commandLine; + } + + /** + * Returns thread that starts and waits for python process to complete + * @return {@link #commandLineThread} + */ + private CommandLineThread getCommandLineThread() { + return commandLineThread; + } + + /** + * Sets {@link #commandLineThread} + * @param commandLineThread {@link CommandLineThread} + */ + private void setCommandLineThread(CommandLineThread commandLineThread) { + this.commandLineThread = commandLineThread; + } + + /** + * Returns thread that starts and waits to get the output and error log stream from the python process + * @return {@link #logStreamThread} + */ + private Thread getLogStreamThread() { + return logStreamThread; + } + + /** + * Sets {@link #logStreamThread} + * @param logStreamThread {@link Thread} + */ + private void setLogStreamThread(Thread logStreamThread) { + this.logStreamThread = logStreamThread; + } + + /** + * Returns log stream from the python subprocess + * @return {@link #logStream} + */ + private LogStreamReader getLogStream() { + return logStream; + } + + /** + * Sets {@link #logStream} + * @param logStream {@link LogStreamReader} + */ + private void setLogStream(LogStreamReader logStream) { + this.logStream = logStream; + } + + /** + * Returns python process + * @return {@link #process} + */ + private Process getProcess() { + return process; + } + + /** + * Sets {@link #process} + * @param process {@link Process} + */ + private void setProcess(Process process) { + this.process = process; + } + } + + /** + * Class to run python process to compress config files as seperate thread + */ private static class CommandLineThread extends Thread { private final Process process; private Integer returnCode; - public void setReturnCode(Integer exit) { + private void setReturnCode(Integer exit) { returnCode = exit; } - public Integer getReturnCode() { + private Integer getReturnCode() { return returnCode; } @@ -530,6 +730,9 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv } + /** + * Class to collect output and error stream of python subprocess + */ private class LogStreamReader implements Runnable { private BufferedReader reader; @@ -560,6 +763,131 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv } } + + /** + * This is the utility class to do further compression related operations + * on already compressed component configuration files + */ + protected static class TarUtils { + + /** + * temporary dir where tar files are saved on ambari server + */ + private String tmpDir; + + /** + * name of the compressed file that should be created + */ + private String fileName; + + + private List<ServiceComponentHostResponse> serviceComponentHostResponses; + + /** + * Constructor sets all the fields of the class + * @param tmpDir {String} + * @param fileName {String} + * @param serviceComponentHostResponses {List} + */ + TarUtils(String tmpDir, String fileName, List<ServiceComponentHostResponse> serviceComponentHostResponses) { + this.tmpDir = tmpDir; + this.fileName = fileName; + this.serviceComponentHostResponses = serviceComponentHostResponses; + } + + /** + * creates single compressed file from the list of existing compressed file + * @throws SystemException + */ + protected void tarConfigFiles() + throws SystemException { + + try { + File compressedOutputFile = new File(tmpDir, fileName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION); + FileOutputStream fOut = new FileOutputStream(compressedOutputFile); + BufferedOutputStream bOut = new BufferedOutputStream(fOut); + GzipCompressorOutputStream gzOut = new GzipCompressorOutputStream(bOut); + TarArchiveOutputStream tOut = new TarArchiveOutputStream(gzOut); + + try { + for (ServiceComponentHostResponse schResponse : serviceComponentHostResponses) { + String componentName = schResponse.getComponentName(); + File compressedInputFile = new File(tmpDir, componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION); + FileInputStream fin = new FileInputStream(compressedInputFile); + BufferedInputStream bIn = new BufferedInputStream(fin); + GzipCompressorInputStream gzIn = new GzipCompressorInputStream(bIn); + TarArchiveInputStream tarIn = new TarArchiveInputStream(gzIn); + TarArchiveEntry entry = null; + try { + while ((entry = tarIn.getNextTarEntry()) != null) { + entry.setName(componentName + File.separator + entry.getName()); + tOut.putArchiveEntry(entry); + if (entry.isFile()) { + IOUtils.copy(tarIn, tOut); + } + tOut.closeArchiveEntry(); + } + } catch (Exception e) { + throw new SystemException(e.getMessage(), e); + } finally { + tarIn.close(); + gzIn.close(); + bIn.close(); + fin.close(); + } + } + } finally { + tOut.finish(); + tOut.close(); + } + } catch (Exception e) { + throw new SystemException(e.getMessage(), e); + } + } + } + + @Override + public RequestStatus updateResources(final Request request, Predicate predicate) + throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException { + + throw new SystemException("The request is not supported"); + } + + @Override + public RequestStatus deleteResources(Request request, Predicate predicate) + throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException { + + throw new SystemException("The request is not supported"); + } + + + // ----- AbstractResourceProvider ------------------------------------------ + + @Override + protected Set<String> getPKPropertyIds() { + return pkPropertyIds; + } + + + // ----- utility methods --------------------------------------------------- + + /** + * Get a component request object from a map of property values. + * + * @param properties the predicate + * @return the component request object + */ + + private ServiceComponentHostRequest getRequest(Map<String, Object> properties) { + return new ServiceComponentHostRequest( + (String) properties.get(COMPONENT_CLUSTER_NAME_PROPERTY_ID), + (String) properties.get(COMPONENT_SERVICE_NAME_PROPERTY_ID), + (String) properties.get(COMPONENT_COMPONENT_NAME_PROPERTY_ID), + (String) properties.get(HOST_COMPONENT_HOST_NAME_PROPERTY_ID), + null); + } + + protected ServiceOsSpecific populateServicePackagesInfo(ServiceInfo serviceInfo, Map<String, String> hostParams, String osFamily) { ServiceOsSpecific hostOs = new ServiceOsSpecific(osFamily); http://git-wip-us.apache.org/repos/asf/ambari/blob/08a99845/ambari-server/src/test/java/org/apache/ambari/server/api/services/ComponentServiceTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/ComponentServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/ComponentServiceTest.java index dc57a5b..d2318fb 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/api/services/ComponentServiceTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/ComponentServiceTest.java @@ -49,8 +49,8 @@ public class ComponentServiceTest extends BaseServiceTest { //getComponents service = new TestComponentService("clusterName", "serviceName", null); - m = service.getClass().getMethod("getComponents", String.class, HttpHeaders.class, UriInfo.class); - args = new Object[] {null, getHttpHeaders(), getUriInfo()}; + m = service.getClass().getMethod("getComponents", String.class, HttpHeaders.class, UriInfo.class, String.class); + args = new Object[] {null, getHttpHeaders(), getUriInfo(), null}; listInvocations.add(new ServiceTestInvocation(Request.Type.GET, service, m, args, null)); //createComponent http://git-wip-us.apache.org/repos/asf/ambari/blob/08a99845/ambari-server/src/test/java/org/apache/ambari/server/api/services/HostComponentServiceTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/HostComponentServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/HostComponentServiceTest.java index 86a2d38..7c6402a 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/api/services/HostComponentServiceTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/HostComponentServiceTest.java @@ -48,8 +48,8 @@ public class HostComponentServiceTest extends BaseServiceTest { //getHostComponents componentService = new TestHostComponentService("clusterName", "serviceName", null); - m = componentService.getClass().getMethod("getHostComponents", String.class, HttpHeaders.class, UriInfo.class); - args = new Object[] {null, getHttpHeaders(), getUriInfo()}; + m = componentService.getClass().getMethod("getHostComponents", String.class, HttpHeaders.class, UriInfo.class, String.class); + args = new Object[] {null, getHttpHeaders(), getUriInfo(), null}; listInvocations.add(new ServiceTestInvocation(Request.Type.GET, componentService, m, args, null)); //createHostComponent http://git-wip-us.apache.org/repos/asf/ambari/blob/08a99845/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProviderTest.java index 4be5013..8fad94e 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProviderTest.java @@ -22,9 +22,11 @@ import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertFalse; +import static org.powermock.api.mockito.PowerMockito.whenNew; import java.io.ByteArrayInputStream; import java.io.File; @@ -75,6 +77,7 @@ import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; +import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -212,14 +215,6 @@ public class ClientConfigResourceProviderTest { PropertyHelper.getKeyPropertyIds(type), managementController); - // create the request - Request request = PropertyHelper.getReadRequest(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID, "c1", - ClientConfigResourceProvider.COMPONENT_COMPONENT_NAME_PROPERTY_ID, - ClientConfigResourceProvider.COMPONENT_SERVICE_NAME_PROPERTY_ID); - - Predicate predicate = new PredicateBuilder().property(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID).equals("c1"). - toPredicate(); - String clusterName = "C1"; String serviceName = "PIG"; String componentName = "PIG"; @@ -274,11 +269,12 @@ public class ClientConfigResourceProviderTest { expect(configuration.areHostsSysPrepped()).andReturn("false"); expect(configuration.isAgentStackRetryOnInstallEnabled()).andReturn("false"); expect(configuration.getAgentStackRetryOnInstallCount()).andReturn("5"); + expect(configuration.getExternalScriptThreadPoolSize()).andReturn(Configuration.THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT.getDefaultValue()); expect(configuration.getExternalScriptTimeout()).andReturn(Configuration.EXTERNAL_SCRIPT_TIMEOUT.getDefaultValue()); Map<String,String> props = new HashMap<String, String>(); props.put("key","value"); expect(clusterConfig.getProperties()).andReturn(props); - expect(configHelper.getEffectiveDesiredTags(cluster, hostName)).andReturn(allConfigTags); + expect(configHelper.getEffectiveDesiredTags(cluster, null)).andReturn(allConfigTags); expect(cluster.getClusterName()).andReturn(clusterName); expect(managementController.getHostComponents((Set<ServiceComponentHostRequest>) anyObject())).andReturn(responses).anyTimes(); expect(cluster.getCurrentStackVersion()).andReturn(stackId); @@ -347,6 +343,19 @@ public class ClientConfigResourceProviderTest { InputStream inputStream = new ByteArrayInputStream("some logging info".getBytes()); expect(process.getInputStream()).andReturn(inputStream); + ClientConfigResourceProvider.TarUtils tarUtilMock = PowerMockito.mock(ClientConfigResourceProvider.TarUtils.class); + whenNew(ClientConfigResourceProvider.TarUtils.class).withAnyArguments().thenReturn(tarUtilMock); + tarUtilMock.tarConfigFiles(); + expectLastCall().once(); + + // create the request + Request request = PropertyHelper.getReadRequest(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID, "c1", + ClientConfigResourceProvider.COMPONENT_COMPONENT_NAME_PROPERTY_ID, + ClientConfigResourceProvider.COMPONENT_SERVICE_NAME_PROPERTY_ID); + + Predicate predicate = new PredicateBuilder().property(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID). + equals("c1").and().property(ClientConfigResourceProvider.COMPONENT_SERVICE_NAME_PROPERTY_ID).equals("PIG").toPredicate(); + // replay replay(managementController, clusters, cluster, ambariMetaInfo, stackId, componentInfo, commandScriptDefinition, clusterConfig, host, service, serviceComponent, serviceComponentHost, serviceInfo, configHelper, @@ -418,8 +427,10 @@ public class ClientConfigResourceProviderTest { ClientConfigResourceProvider.COMPONENT_COMPONENT_NAME_PROPERTY_ID, ClientConfigResourceProvider.COMPONENT_SERVICE_NAME_PROPERTY_ID); - Predicate predicate = new PredicateBuilder().property(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID).equals("c1"). - toPredicate(); + Predicate predicate = new PredicateBuilder().property(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID). + equals("c1").and().property(ClientConfigResourceProvider.COMPONENT_COMPONENT_NAME_PROPERTY_ID).equals("PIG"). + and().property(ClientConfigResourceProvider.COMPONENT_SERVICE_NAME_PROPERTY_ID).equals("PIG"). + toPredicate(); String clusterName = "C1"; String serviceName = "PIG"; @@ -476,12 +487,13 @@ public class ClientConfigResourceProviderTest { expect(configuration.areHostsSysPrepped()).andReturn("false"); expect(configuration.isAgentStackRetryOnInstallEnabled()).andReturn("false"); expect(configuration.getAgentStackRetryOnInstallCount()).andReturn("5"); + expect(configuration.getExternalScriptThreadPoolSize()).andReturn(Configuration.THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT.getDefaultValue()); expect(configuration.getExternalScriptTimeout()).andReturn(Configuration.EXTERNAL_SCRIPT_TIMEOUT.getDefaultValue()); Map<String,String> props = new HashMap<String, String>(); props.put("key","value"); expect(clusterConfig.getProperties()).andReturn(props); - expect(configHelper.getEffectiveDesiredTags(cluster, hostName)).andReturn(allConfigTags); + expect(configHelper.getEffectiveDesiredTags(cluster, null)).andReturn(allConfigTags); expect(cluster.getClusterName()).andReturn(clusterName); expect(managementController.getHostComponents((Set<ServiceComponentHostRequest>) anyObject())).andReturn(responses).anyTimes(); expect(cluster.getCurrentStackVersion()).andReturn(stackId);