AMBARI-22628 - YARN Shuffle Service Can't Be Found On Client-Only Nodes After 
New Cluster Install (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f844afdc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f844afdc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f844afdc

Branch: refs/heads/branch-feature-AMBARI-22008-isilon
Commit: f844afdcdfc5a397ddc61c602026a8887ea9566c
Parents: 51e2058
Author: Jonathan Hurley <jhur...@hortonworks.com>
Authored: Mon Dec 11 09:03:09 2017 -0500
Committer: Attila Magyar <amag...@hortonworks.com>
Committed: Mon Dec 18 09:12:46 2017 +0100

----------------------------------------------------------------------
 .../ambari/server/agent/ExecutionCommand.java   | 37 ++++++----------
 .../internal/ClientConfigResourceProvider.java  |  2 +
 .../org/apache/ambari/server/state/Cluster.java |  9 ++++
 .../server/state/cluster/ClusterImpl.java       | 31 +++++++++++++
 .../2.1.0.2.0/package/scripts/params_linux.py   | 41 +++++++++++++++--
 .../3.0.0.3.0/package/scripts/params_linux.py   | 41 +++++++++++++++--
 .../ExecutionCommandWrapperTest.java            |  5 +++
 .../AmbariCustomCommandExecutionHelperTest.java | 46 ++++++++++++++++++++
 .../apache/ambari/server/orm/OrmTestHelper.java |  3 ++
 9 files changed, 183 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
index 5ee4bf6..9093985 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
@@ -29,8 +29,6 @@ import org.apache.ambari.annotations.ExperimentalFeature;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Service;
-import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.UpgradeContext.UpgradeSummary;
 import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
@@ -553,30 +551,21 @@ public class ExecutionCommand extends AgentCommand {
   }
 
   /**
-   * Used to set a map of {service -> { component -> version}}.  This is 
necessary when performing
-   * an upgrade to correct build paths of required binaries.
-   * @param cluster the cluster from which to build the map
+   * Used to set a map of {service -> { component -> version}}. This is
+   * necessary when performing an upgrade to correct build paths of required
+   * binaries. This method will only set the version information for a 
component
+   * if:
+   * <ul>
+   * <li>The component advertises a version</li>
+   * <li>The repository for the component has been resolved and the version can
+   * be trusted</li>
+   * </ul>
+   *
+   * @param cluster
+   *          the cluster from which to build the map
    */
   public void setComponentVersions(Cluster cluster) throws AmbariException {
-    Map<String, Map<String, String>> componentVersionMap = new HashMap<>();
-
-    for (Service service : cluster.getServices().values()) {
-      Map<String, String> componentMap = new HashMap<>();
-
-      boolean shouldSet = false;
-      for (ServiceComponent component : 
service.getServiceComponents().values()) {
-        if (component.isVersionAdvertised()) {
-          shouldSet = true;
-          componentMap.put(component.getName(), component.getDesiredVersion());
-        }
-      }
-
-      if (shouldSet) {
-        componentVersionMap.put(service.getName(), componentMap);
-      }
-    }
-
-    this.componentVersionMap = componentVersionMap;
+    componentVersionMap = cluster.getComponentVersionMap();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
index 75f47ca..dcafdea 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
@@ -458,6 +458,8 @@ public class ClientConfigResourceProvider extends 
AbstractControllerResourceProv
         jsonContent.put("clusterName", cluster.getClusterName());
         jsonContent.put("serviceName", serviceName);
         jsonContent.put("role", componentName);
+        jsonContent.put("componentVersionMap", 
cluster.getComponentVersionMap());
+
         jsonConfigurations = gson.toJson(jsonContent);
 
         File tmpDirectory = new File(TMP_PATH);

http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java 
b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index 90dd611..62226af 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -673,4 +673,13 @@ public interface Cluster {
    */
   void addSuspendedUpgradeParameters(Map<String, String> commandParams,
       Map<String, String> roleParams);
+
+  /**
+   * Gets a mapping of service to component/version for every installed
+   * component in the cluster which advertises a version and for which the
+   * repository has been resolved.
+   *
+   * @return a mapping of service to component version, or an empty map.
+   */
+  Map<String, Map<String, String>> getComponentVersionMap();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index e6d42ce..8346727 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -2704,4 +2704,35 @@ public class ClusterImpl implements Cluster {
     // suspended goes in role params
     roleParams.put(KeyNames.UPGRADE_SUSPENDED, 
Boolean.TRUE.toString().toLowerCase());
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Map<String, Map<String, String>> getComponentVersionMap() {
+    Map<String, Map<String, String>> componentVersionMap = new HashMap<>();
+
+    for (Service service : getServices().values()) {
+      Map<String, String> componentMap = new HashMap<>();
+      for (ServiceComponent component : 
service.getServiceComponents().values()) {
+        // skip components which don't advertise a version
+        if (!component.isVersionAdvertised()) {
+          continue;
+        }
+
+        // if the repo isn't resolved, then we can't trust the version
+        if (!component.getDesiredRepositoryVersion().isResolved()) {
+          continue;
+        }
+
+        componentMap.put(component.getName(), component.getDesiredVersion());
+      }
+
+      if (!componentMap.isEmpty()) {
+        componentVersionMap.put(service.getName(), componentMap);
+      }
+    }
+
+    return componentVersionMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
 
b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
index 1bc01d4..9997779 100644
--- 
a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
+++ 
b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
@@ -21,6 +21,7 @@ Ambari Agent
 import os
 
 from resource_management.core import sudo
+from resource_management.core.logger import Logger
 from resource_management.libraries.script.script import Script
 from resource_management.libraries.resources.hdfs_resource import HdfsResource
 from resource_management.libraries.functions import component_version
@@ -84,11 +85,45 @@ stack_supports_timeline_state_store = 
check_stack_feature(StackFeature.TIMELINE_
 # It cannot be used during the initial Cluser Install because the version is 
not yet known.
 version = default("/commandParams/version", None)
 
+def get_spark_version(service_name, component_name, yarn_version):
+  """
+  Attempts to calculate the correct version placeholder value for spark or 
spark2 based on
+  what is installed in the cluster. If Spark is not installed, then this value 
will need to be
+  that of YARN so it can still find the correct shuffle class.
+
+  On cluster installs, we have not yet calcualted any versions and all known 
values could be None.
+  This doesn't affect daemons, but it does affect client-only hosts where they 
will never receive
+  a start command after install. Therefore, this function will attempt to use 
stack-select as a
+  last resort to get a value value.
+  :param service_name:  the service name (SPARK, SPARK2, etc)
+  :param component_name:  the component name (SPARK_CLIENT, etc)
+  :param yarn_version:  the default version of Yarn to use if no spark is 
installed
+  :return:  a value for the version placeholder in shuffle classpath properties
+  """
+  # start off seeing if we need to populate a default value for YARN
+  if yarn_version is None:
+    yarn_version = 
component_version.get_component_repository_version(service_name = "YARN",
+      component_name = "YARN_CLIENT")
+
+  # now try to get the version of spark/spark2, defaulting to the version if 
YARN
+  shuffle_classpath_version = 
component_version.get_component_repository_version(service_name = service_name,
+    component_name = component_name, default_value = yarn_version)
+
+  # even with the default of using YARN's version, on an install this might be 
None since we haven't
+  # calculated the version of YARN yet - use stack_select as a last ditch 
effort
+  if shuffle_classpath_version is None:
+    try:
+      shuffle_classpath_version = 
stack_select.get_role_component_current_stack_version()
+    except:
+      Logger.exception("Unable to query for the correct shuffle classpath")
+
+  return shuffle_classpath_version
+
 # these are used to render the classpath for picking up Spark classes
 # in the event that spark is not installed, then we must default to the 
vesrion of YARN installed
 # since it will still load classes from its own spark version
-spark_version = 
component_version.get_component_repository_version(service_name =  "SPARK", 
component_name = "SPARK_CLIENT", default_value = version)
-spark2_version = 
component_version.get_component_repository_version(service_name = "SPARK2", 
component_name = "SPARK2_CLIENT", default_value = version)
+spark_version = get_spark_version("SPARK", "SPARK_CLIENT", version)
+spark2_version = get_spark_version("SPARK2", "SPARK2_CLIENT", version)
 
 stack_supports_ranger_kerberos = 
check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, 
version_for_stack_feature_checks)
 stack_supports_ranger_audit_db = 
check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, 
version_for_stack_feature_checks)
@@ -548,5 +583,3 @@ if enable_ranger_yarn and is_supported_yarn_ranger:
 
 # need this to capture cluster name from where ranger yarn plugin is enabled
 cluster_name = config['clusterName']
-
-# ranger yarn plugin end section

http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/package/scripts/params_linux.py
 
b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/package/scripts/params_linux.py
index b8a5382..8e9632c 100644
--- 
a/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/package/scripts/params_linux.py
+++ 
b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/package/scripts/params_linux.py
@@ -21,6 +21,7 @@ Ambari Agent
 import os
 
 from resource_management.core import sudo
+from resource_management.core.logger import Logger
 from resource_management.libraries.script.script import Script
 from resource_management.libraries.resources.hdfs_resource import HdfsResource
 from resource_management.libraries.functions import component_version
@@ -84,11 +85,45 @@ stack_supports_timeline_state_store = 
check_stack_feature(StackFeature.TIMELINE_
 # It cannot be used during the initial Cluser Install because the version is 
not yet known.
 version = default("/commandParams/version", None)
 
+def get_spark_version(service_name, component_name, yarn_version):
+  """
+  Attempts to calculate the correct version placeholder value for spark or 
spark2 based on
+  what is installed in the cluster. If Spark is not installed, then this value 
will need to be
+  that of YARN so it can still find the correct shuffle class.
+
+  On cluster installs, we have not yet calcualted any versions and all known 
values could be None.
+  This doesn't affect daemons, but it does affect client-only hosts where they 
will never receive
+  a start command after install. Therefore, this function will attempt to use 
stack-select as a
+  last resort to get a value value.
+  :param service_name:  the service name (SPARK, SPARK2, etc)
+  :param component_name:  the component name (SPARK_CLIENT, etc)
+  :param yarn_version:  the default version of Yarn to use if no spark is 
installed
+  :return:  a value for the version placeholder in shuffle classpath properties
+  """
+  # start off seeing if we need to populate a default value for YARN
+  if yarn_version is None:
+    yarn_version = 
component_version.get_component_repository_version(service_name = "YARN",
+      component_name = "YARN_CLIENT")
+
+  # now try to get the version of spark/spark2, defaulting to the version if 
YARN
+  shuffle_classpath_version = 
component_version.get_component_repository_version(service_name = service_name,
+    component_name = component_name, default_value = yarn_version)
+
+  # even with the default of using YARN's version, on an install this might be 
None since we haven't
+  # calculated the version of YARN yet - use stack_select as a last ditch 
effort
+  if shuffle_classpath_version is None:
+    try:
+      shuffle_classpath_version = 
stack_select.get_role_component_current_stack_version()
+    except:
+      Logger.exception("Unable to query for the correct shuffle classpath")
+
+  return shuffle_classpath_version
+
 # these are used to render the classpath for picking up Spark classes
 # in the event that spark is not installed, then we must default to the 
vesrion of YARN installed
 # since it will still load classes from its own spark version
-spark_version = 
component_version.get_component_repository_version(service_name =  "SPARK", 
component_name = "SPARK_CLIENT", default_value = version)
-spark2_version = 
component_version.get_component_repository_version(service_name = "SPARK2", 
component_name = "SPARK2_CLIENT", default_value = version)
+spark_version = get_spark_version("SPARK", "SPARK_CLIENT", version)
+spark2_version = get_spark_version("SPARK2", "SPARK2_CLIENT", version)
 
 stack_supports_ranger_kerberos = 
check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, 
version_for_stack_feature_checks)
 stack_supports_ranger_audit_db = 
check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, 
version_for_stack_feature_checks)
@@ -547,5 +582,3 @@ if enable_ranger_yarn and is_supported_yarn_ranger:
 
 # need this to capture cluster name from where ranger yarn plugin is enabled
 cluster_name = config['clusterName']
-
-# ranger yarn plugin end section

http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
index f12c701..10ef219 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
@@ -285,7 +285,12 @@ public class ExecutionCommandWrapperTest {
     Cluster cluster = clusters.getCluster(CLUSTER1);
 
     StackId stackId = cluster.getDesiredStackVersion();
+    
+    // set the repo version resolved state to verify that the version is not 
sent
     RepositoryVersionEntity repositoryVersion = 
ormTestHelper.getOrCreateRepositoryVersion(stackId, "0.1-0000");
+    repositoryVersion.setResolved(false);
+    ormTestHelper.repositoryVersionDAO.merge(repositoryVersion);
+
     Service service = cluster.getService("HDFS");
     service.setDesiredRepositoryVersion(repositoryVersion);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
index 26c79e6..6bece66 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
@@ -564,6 +564,52 @@ public class AmbariCustomCommandExecutionHelperTest {
     
Assert.assertTrue(command.getComponentVersionMap().containsKey("ZOOKEEPER"));
   }
 
+  /**
+   * Tests that if a component's repository is not resolved, then the repo
+   * version map does not get populated.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAvailableServicesMapIsEmptyWhenRepositoriesNotResolved() 
throws Exception {
+
+    // set all repos to resolve=false to verify that we don't get a
+    // component version map
+    RepositoryVersionDAO repositoryVersionDAO = 
injector.getInstance(RepositoryVersionDAO.class);
+    List<RepositoryVersionEntity> repoVersions = 
repositoryVersionDAO.findAll();
+    for (RepositoryVersionEntity repoVersion : repoVersions) {
+      repoVersion.setResolved(false);
+      repositoryVersionDAO.merge(repoVersion);
+    }
+
+    Map<String, String> requestProperties = new HashMap<String, String>() {
+      {
+        put(REQUEST_CONTEXT_PROPERTY, "Refresh YARN Capacity Scheduler");
+        put("command", "REFRESHQUEUES");
+      }
+    };
+
+    ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", 
"REFRESHQUEUES",
+        new HashMap<String, String>() {
+          {
+            put("forceRefreshConfigTags", "capacity-scheduler");
+          }
+        }, false);
+
+    actionRequest.getResourceFilters().add(new RequestResourceFilter("YARN", 
"RESOURCEMANAGER",
+        Collections.singletonList("c1-c6401")));
+
+    EasyMock.replay(hostRoleCommand, actionManager, configHelper);
+
+    ambariManagementController.createAction(actionRequest, requestProperties);
+    Request request = requestCapture.getValue();
+    Stage stage = request.getStages().iterator().next();
+    List<ExecutionCommandWrapper> commands = 
stage.getExecutionCommands("c1-c6401");
+    ExecutionCommand command = commands.get(0).getExecutionCommand();
+
+    Assert.assertTrue(MapUtils.isEmpty(command.getComponentVersionMap()));
+  }
+
   @Test
   public void testCommandRepository() throws Exception {
     Cluster cluster = clusters.getCluster("c1");

http://git-wip-us.apache.org/repos/asf/ambari/blob/f844afdc/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
index 9da66f2..911c570 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
@@ -674,6 +674,9 @@ public class OrmTestHelper {
 
         repositoryVersion = repositoryVersionDAO.create(stackEntity, version,
             String.valueOf(System.currentTimeMillis()) + 
uniqueCounter.incrementAndGet(), operatingSystems);
+
+        repositoryVersion.setResolved(true);
+        repositoryVersion = repositoryVersionDAO.merge(repositoryVersion);
       } catch (Exception ex) {
         LOG.error("Caught exception", ex);
 

Reply via email to