AMBARI-22628 - YARN Shuffle Service Can't Be Found On Client-Only Nodes After New Cluster Install (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f844afdc Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f844afdc Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f844afdc Branch: refs/heads/branch-feature-AMBARI-22008-isilon Commit: f844afdcdfc5a397ddc61c602026a8887ea9566c Parents: 51e2058 Author: Jonathan Hurley <jhur...@hortonworks.com> Authored: Mon Dec 11 09:03:09 2017 -0500 Committer: Attila Magyar <amag...@hortonworks.com> Committed: Mon Dec 18 09:12:46 2017 +0100 ---------------------------------------------------------------------- .../ambari/server/agent/ExecutionCommand.java | 37 ++++++---------- .../internal/ClientConfigResourceProvider.java | 2 + .../org/apache/ambari/server/state/Cluster.java | 9 ++++ .../server/state/cluster/ClusterImpl.java | 31 +++++++++++++ .../2.1.0.2.0/package/scripts/params_linux.py | 41 +++++++++++++++-- .../3.0.0.3.0/package/scripts/params_linux.py | 41 +++++++++++++++-- .../ExecutionCommandWrapperTest.java | 5 +++ .../AmbariCustomCommandExecutionHelperTest.java | 46 ++++++++++++++++++++ .../apache/ambari/server/orm/OrmTestHelper.java | 3 ++ 9 files changed, 183 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java index 5ee4bf6..9093985 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java @@ -29,8 +29,6 @@ import org.apache.ambari.annotations.ExperimentalFeature; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.state.Cluster; -import org.apache.ambari.server.state.Service; -import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.UpgradeContext.UpgradeSummary; import org.apache.ambari.server.utils.StageUtils; import org.slf4j.Logger; @@ -553,30 +551,21 @@ public class ExecutionCommand extends AgentCommand { } /** - * Used to set a map of {service -> { component -> version}}. This is necessary when performing - * an upgrade to correct build paths of required binaries. - * @param cluster the cluster from which to build the map + * Used to set a map of {service -> { component -> version}}. This is + * necessary when performing an upgrade to correct build paths of required + * binaries. This method will only set the version information for a component + * if: + * <ul> + * <li>The component advertises a version</li> + * <li>The repository for the component has been resolved and the version can + * be trusted</li> + * </ul> + * + * @param cluster + * the cluster from which to build the map */ public void setComponentVersions(Cluster cluster) throws AmbariException { - Map<String, Map<String, String>> componentVersionMap = new HashMap<>(); - - for (Service service : cluster.getServices().values()) { - Map<String, String> componentMap = new HashMap<>(); - - boolean shouldSet = false; - for (ServiceComponent component : service.getServiceComponents().values()) { - if (component.isVersionAdvertised()) { - shouldSet = true; - componentMap.put(component.getName(), component.getDesiredVersion()); - } - } - - if (shouldSet) { - componentVersionMap.put(service.getName(), componentMap); - } - } - - this.componentVersionMap = componentVersionMap; + componentVersionMap = cluster.getComponentVersionMap(); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java index 75f47ca..dcafdea 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java @@ -458,6 +458,8 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv jsonContent.put("clusterName", cluster.getClusterName()); jsonContent.put("serviceName", serviceName); jsonContent.put("role", componentName); + jsonContent.put("componentVersionMap", cluster.getComponentVersionMap()); + jsonConfigurations = gson.toJson(jsonContent); File tmpDirectory = new File(TMP_PATH); http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java index 90dd611..62226af 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java @@ -673,4 +673,13 @@ public interface Cluster { */ void addSuspendedUpgradeParameters(Map<String, String> commandParams, Map<String, String> roleParams); + + /** + * Gets a mapping of service to component/version for every installed + * component in the cluster which advertises a version and for which the + * repository has been resolved. + * + * @return a mapping of service to component version, or an empty map. + */ + Map<String, Map<String, String>> getComponentVersionMap(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index e6d42ce..8346727 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -2704,4 +2704,35 @@ public class ClusterImpl implements Cluster { // suspended goes in role params roleParams.put(KeyNames.UPGRADE_SUSPENDED, Boolean.TRUE.toString().toLowerCase()); } + + /** + * {@inheritDoc} + */ + @Override + public Map<String, Map<String, String>> getComponentVersionMap() { + Map<String, Map<String, String>> componentVersionMap = new HashMap<>(); + + for (Service service : getServices().values()) { + Map<String, String> componentMap = new HashMap<>(); + for (ServiceComponent component : service.getServiceComponents().values()) { + // skip components which don't advertise a version + if (!component.isVersionAdvertised()) { + continue; + } + + // if the repo isn't resolved, then we can't trust the version + if (!component.getDesiredRepositoryVersion().isResolved()) { + continue; + } + + componentMap.put(component.getName(), component.getDesiredVersion()); + } + + if (!componentMap.isEmpty()) { + componentVersionMap.put(service.getName(), componentMap); + } + } + + return componentVersionMap; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py index 1bc01d4..9997779 100644 --- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py +++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py @@ -21,6 +21,7 @@ Ambari Agent import os from resource_management.core import sudo +from resource_management.core.logger import Logger from resource_management.libraries.script.script import Script from resource_management.libraries.resources.hdfs_resource import HdfsResource from resource_management.libraries.functions import component_version @@ -84,11 +85,45 @@ stack_supports_timeline_state_store = check_stack_feature(StackFeature.TIMELINE_ # It cannot be used during the initial Cluser Install because the version is not yet known. version = default("/commandParams/version", None) +def get_spark_version(service_name, component_name, yarn_version): + """ + Attempts to calculate the correct version placeholder value for spark or spark2 based on + what is installed in the cluster. If Spark is not installed, then this value will need to be + that of YARN so it can still find the correct shuffle class. + + On cluster installs, we have not yet calcualted any versions and all known values could be None. + This doesn't affect daemons, but it does affect client-only hosts where they will never receive + a start command after install. Therefore, this function will attempt to use stack-select as a + last resort to get a value value. + :param service_name: the service name (SPARK, SPARK2, etc) + :param component_name: the component name (SPARK_CLIENT, etc) + :param yarn_version: the default version of Yarn to use if no spark is installed + :return: a value for the version placeholder in shuffle classpath properties + """ + # start off seeing if we need to populate a default value for YARN + if yarn_version is None: + yarn_version = component_version.get_component_repository_version(service_name = "YARN", + component_name = "YARN_CLIENT") + + # now try to get the version of spark/spark2, defaulting to the version if YARN + shuffle_classpath_version = component_version.get_component_repository_version(service_name = service_name, + component_name = component_name, default_value = yarn_version) + + # even with the default of using YARN's version, on an install this might be None since we haven't + # calculated the version of YARN yet - use stack_select as a last ditch effort + if shuffle_classpath_version is None: + try: + shuffle_classpath_version = stack_select.get_role_component_current_stack_version() + except: + Logger.exception("Unable to query for the correct shuffle classpath") + + return shuffle_classpath_version + # these are used to render the classpath for picking up Spark classes # in the event that spark is not installed, then we must default to the vesrion of YARN installed # since it will still load classes from its own spark version -spark_version = component_version.get_component_repository_version(service_name = "SPARK", component_name = "SPARK_CLIENT", default_value = version) -spark2_version = component_version.get_component_repository_version(service_name = "SPARK2", component_name = "SPARK2_CLIENT", default_value = version) +spark_version = get_spark_version("SPARK", "SPARK_CLIENT", version) +spark2_version = get_spark_version("SPARK2", "SPARK2_CLIENT", version) stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks) stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks) @@ -548,5 +583,3 @@ if enable_ranger_yarn and is_supported_yarn_ranger: # need this to capture cluster name from where ranger yarn plugin is enabled cluster_name = config['clusterName'] - -# ranger yarn plugin end section http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/package/scripts/params_linux.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/package/scripts/params_linux.py index b8a5382..8e9632c 100644 --- a/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/package/scripts/params_linux.py +++ b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/package/scripts/params_linux.py @@ -21,6 +21,7 @@ Ambari Agent import os from resource_management.core import sudo +from resource_management.core.logger import Logger from resource_management.libraries.script.script import Script from resource_management.libraries.resources.hdfs_resource import HdfsResource from resource_management.libraries.functions import component_version @@ -84,11 +85,45 @@ stack_supports_timeline_state_store = check_stack_feature(StackFeature.TIMELINE_ # It cannot be used during the initial Cluser Install because the version is not yet known. version = default("/commandParams/version", None) +def get_spark_version(service_name, component_name, yarn_version): + """ + Attempts to calculate the correct version placeholder value for spark or spark2 based on + what is installed in the cluster. If Spark is not installed, then this value will need to be + that of YARN so it can still find the correct shuffle class. + + On cluster installs, we have not yet calcualted any versions and all known values could be None. + This doesn't affect daemons, but it does affect client-only hosts where they will never receive + a start command after install. Therefore, this function will attempt to use stack-select as a + last resort to get a value value. + :param service_name: the service name (SPARK, SPARK2, etc) + :param component_name: the component name (SPARK_CLIENT, etc) + :param yarn_version: the default version of Yarn to use if no spark is installed + :return: a value for the version placeholder in shuffle classpath properties + """ + # start off seeing if we need to populate a default value for YARN + if yarn_version is None: + yarn_version = component_version.get_component_repository_version(service_name = "YARN", + component_name = "YARN_CLIENT") + + # now try to get the version of spark/spark2, defaulting to the version if YARN + shuffle_classpath_version = component_version.get_component_repository_version(service_name = service_name, + component_name = component_name, default_value = yarn_version) + + # even with the default of using YARN's version, on an install this might be None since we haven't + # calculated the version of YARN yet - use stack_select as a last ditch effort + if shuffle_classpath_version is None: + try: + shuffle_classpath_version = stack_select.get_role_component_current_stack_version() + except: + Logger.exception("Unable to query for the correct shuffle classpath") + + return shuffle_classpath_version + # these are used to render the classpath for picking up Spark classes # in the event that spark is not installed, then we must default to the vesrion of YARN installed # since it will still load classes from its own spark version -spark_version = component_version.get_component_repository_version(service_name = "SPARK", component_name = "SPARK_CLIENT", default_value = version) -spark2_version = component_version.get_component_repository_version(service_name = "SPARK2", component_name = "SPARK2_CLIENT", default_value = version) +spark_version = get_spark_version("SPARK", "SPARK_CLIENT", version) +spark2_version = get_spark_version("SPARK2", "SPARK2_CLIENT", version) stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks) stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks) @@ -547,5 +582,3 @@ if enable_ranger_yarn and is_supported_yarn_ranger: # need this to capture cluster name from where ranger yarn plugin is enabled cluster_name = config['clusterName'] - -# ranger yarn plugin end section http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java index f12c701..10ef219 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java @@ -285,7 +285,12 @@ public class ExecutionCommandWrapperTest { Cluster cluster = clusters.getCluster(CLUSTER1); StackId stackId = cluster.getDesiredStackVersion(); + + // set the repo version resolved state to verify that the version is not sent RepositoryVersionEntity repositoryVersion = ormTestHelper.getOrCreateRepositoryVersion(stackId, "0.1-0000"); + repositoryVersion.setResolved(false); + ormTestHelper.repositoryVersionDAO.merge(repositoryVersion); + Service service = cluster.getService("HDFS"); service.setDesiredRepositoryVersion(repositoryVersion); http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java index 26c79e6..6bece66 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java @@ -564,6 +564,52 @@ public class AmbariCustomCommandExecutionHelperTest { Assert.assertTrue(command.getComponentVersionMap().containsKey("ZOOKEEPER")); } + /** + * Tests that if a component's repository is not resolved, then the repo + * version map does not get populated. + * + * @throws Exception + */ + @Test + public void testAvailableServicesMapIsEmptyWhenRepositoriesNotResolved() throws Exception { + + // set all repos to resolve=false to verify that we don't get a + // component version map + RepositoryVersionDAO repositoryVersionDAO = injector.getInstance(RepositoryVersionDAO.class); + List<RepositoryVersionEntity> repoVersions = repositoryVersionDAO.findAll(); + for (RepositoryVersionEntity repoVersion : repoVersions) { + repoVersion.setResolved(false); + repositoryVersionDAO.merge(repoVersion); + } + + Map<String, String> requestProperties = new HashMap<String, String>() { + { + put(REQUEST_CONTEXT_PROPERTY, "Refresh YARN Capacity Scheduler"); + put("command", "REFRESHQUEUES"); + } + }; + + ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", "REFRESHQUEUES", + new HashMap<String, String>() { + { + put("forceRefreshConfigTags", "capacity-scheduler"); + } + }, false); + + actionRequest.getResourceFilters().add(new RequestResourceFilter("YARN", "RESOURCEMANAGER", + Collections.singletonList("c1-c6401"))); + + EasyMock.replay(hostRoleCommand, actionManager, configHelper); + + ambariManagementController.createAction(actionRequest, requestProperties); + Request request = requestCapture.getValue(); + Stage stage = request.getStages().iterator().next(); + List<ExecutionCommandWrapper> commands = stage.getExecutionCommands("c1-c6401"); + ExecutionCommand command = commands.get(0).getExecutionCommand(); + + Assert.assertTrue(MapUtils.isEmpty(command.getComponentVersionMap())); + } + @Test public void testCommandRepository() throws Exception { Cluster cluster = clusters.getCluster("c1"); http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java index 9da66f2..911c570 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java @@ -674,6 +674,9 @@ public class OrmTestHelper { repositoryVersion = repositoryVersionDAO.create(stackEntity, version, String.valueOf(System.currentTimeMillis()) + uniqueCounter.incrementAndGet(), operatingSystems); + + repositoryVersion.setResolved(true); + repositoryVersion = repositoryVersionDAO.merge(repositoryVersion); } catch (Exception ex) { LOG.error("Caught exception", ex);