Repository: ambari Updated Branches: refs/heads/branch-2.0.maint 15d3f0d29 -> fc304d985
AMBARI-11039. RU wrong version of tez.tar.gz uploaded to HDFS (alejandro) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fc304d98 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fc304d98 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fc304d98 Branch: refs/heads/branch-2.0.maint Commit: fc304d985e4ed0802b31c0c488344c8c3e5b9e85 Parents: 15d3f0d Author: Alejandro Fernandez <afernan...@hortonworks.com> Authored: Fri May 8 19:08:43 2015 -0700 Committer: Alejandro Fernandez <afernan...@hortonworks.com> Committed: Mon May 11 11:12:49 2015 -0700 ---------------------------------------------------------------------- .../dynamic_variable_interpretation.py | 95 +++++++++++++------- 1 file changed, 61 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/fc304d98/ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py b/ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py index 1e70219..b32feb3 100644 --- a/ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py +++ b/ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py @@ -20,11 +20,9 @@ limitations under the License. __all__ = ["copy_tarballs_to_hdfs", ] import os -import glob import re import tempfile import uuid -from resource_management.libraries.functions.default import default from resource_management.libraries.functions.format import format from resource_management.libraries.resources.copy_from_local import CopyFromLocal from resource_management.libraries.resources.execute_hadoop import ExecuteHadoop @@ -33,6 +31,35 @@ from resource_management.core.exceptions import Fail from resource_management.core.logger import Logger from resource_management.core import shell + +# For a given stack, define a component, such as tez or hive, with a 2-tuple that defines +# (a, b), where +# a = source file to use +# b = destination folder to copy file to in HDFS. +# {{ hdp_stack_version }} is dynamically interpreted based on the version +SOURCE_TO_DESTINATION = {"HDP": + { + "tez": + ("/usr/hdp/{{ hdp_stack_version }}/tez/lib/tez.tar.gz", + "/hdp/apps/{{ hdp_stack_version }}/tez/"), + "hive": + ("/usr/hdp/{{ hdp_stack_version }}/hive/hive.tar.gz", + "/hdp/apps/{{ hdp_stack_version }}/hive/"), + "pig": + ("/usr/hdp/{{ hdp_stack_version }}/pig/pig.tar.gz", + "/hdp/apps/{{ hdp_stack_version }}/pig/"), + "hadoop-streaming": + ("/usr/hdp/{{ hdp_stack_version }}/hadoop-mapreduce/hadoop-streaming.jar", + "/hdp/apps/{{ hdp_stack_version }}/mapreduce/"), + "sqoop": + ("/usr/hdp/{{ hdp_stack_version }}/sqoop/sqoop.tar.gz", + "/hdp/apps/{{ hdp_stack_version }}/sqoop/"), + "mapreduce": + ("/usr/hdp/{{ hdp_stack_version }}/hadoop/mapreduce.tar.gz", + "/hdp/apps/{{ hdp_stack_version }}/mapreduce/") + } +} + """ This file provides helper methods needed for the versioning of RPMs. Specifically, it does dynamic variable interpretation to replace strings like {{ hdp_stack_version }} where the value of the @@ -43,42 +70,37 @@ E.g., 998.2.2.1.0-998 Please note that "-${build_number}" is optional. """ -# These values must be the suffix of the properties in cluster-env.xml -TAR_SOURCE_SUFFIX = "_tar_source" -TAR_DESTINATION_FOLDER_SUFFIX = "_tar_destination_folder" - -def _get_tar_source_and_dest_folder(tarball_prefix): +def _get_tar_source_and_dest_folder(stack_name, tarball_prefix): """ + :param stack_name: Stack name, such as "HDP" :param tarball_prefix: Prefix of the tarball must be one of tez, hive, mr, pig - :return: Returns a tuple of (x, y) after verifying the properties + :return: Returns a tuple of (source_file, destination_folder) after verifying the properties """ - component_tar_source_file = default("/configurations/cluster-env/%s%s" % (tarball_prefix.lower(), TAR_SOURCE_SUFFIX), None) - # E.g., /usr/hdp/current/hadoop-client/tez-{{ hdp_stack_version }}.tar.gz - - component_tar_destination_folder = default("/configurations/cluster-env/%s%s" % (tarball_prefix.lower(), TAR_DESTINATION_FOLDER_SUFFIX), None) - # E.g., hdfs:///hdp/apps/{{ hdp_stack_version }}/mapreduce/ + if stack_name is None or stack_name not in SOURCE_TO_DESTINATION: + Logger.warning("Did not find stack_name %s in dictionary." % str(stack_name)) + return None, None - if not component_tar_source_file or not component_tar_destination_folder: - Logger.warning("Did not find %s tar source file and destination folder properties in cluster-env.xml" % - tarball_prefix) + if tarball_prefix is None or tarball_prefix not in SOURCE_TO_DESTINATION[stack_name]: + Logger.warning("Did not find tarball prefix %s in dictionary for stack %s." % (str(tarball_prefix), str(stack_name))) return None, None - if component_tar_source_file.find("/") == -1: - Logger.warning("The tar file path %s is not valid" % str(component_tar_source_file)) + (source_file, destination_folder) = SOURCE_TO_DESTINATION[stack_name][tarball_prefix] + + if source_file.find("/") == -1: + Logger.warning("The tar file path %s is not valid" % str(source_file)) return None, None - if not component_tar_destination_folder.endswith("/"): - component_tar_destination_folder = component_tar_destination_folder + "/" + if not destination_folder.endswith("/"): + destination_folder = destination_folder + "/" - return component_tar_source_file, component_tar_destination_folder + return source_file, destination_folder -def _copy_files(source_and_dest_pairs, component_user, file_owner, group_owner, kinit_if_needed): +def _copy_files(source_and_dest_pairs, file_owner, group_owner, kinit_if_needed): """ :param source_and_dest_pairs: List of tuples (x, y), where x is the source file in the local file system, and y is the destination file path in HDFS - :param component_user: User that will execute the Hadoop commands, usually smokeuser :param file_owner: Owner to set for the file copied to HDFS (typically hdfs account) :param group_owner: Owning group to set for the file copied to HDFS (typically hadoop group) :param kinit_if_needed: kinit command if it is needed, otherwise an empty string @@ -152,17 +174,17 @@ def copy_tarballs_to_hdfs(tarball_prefix, hdp_select_component_name, component_u """ import params - if not hasattr(params, "hdp_stack_version") or params.hdp_stack_version is None: - Logger.warning("Could not find hdp_stack_version") + if not hasattr(params, "stack_name") or params.stack_name is None: + Logger.warning("Could not find stack_name in params") return 1 - component_tar_source_file, component_tar_destination_folder = _get_tar_source_and_dest_folder(tarball_prefix) - if not component_tar_source_file or not component_tar_destination_folder: - Logger.warning("Could not retrieve properties for tarball with prefix: %s" % str(tarball_prefix)) + if not hasattr(params, "hdp_stack_version") or params.hdp_stack_version is None: + Logger.warning("Could not find hdp_stack_version in params") return 1 - if not os.path.exists(component_tar_source_file): - Logger.warning("Could not find file: %s" % str(component_tar_source_file)) + source_file, destination_folder = _get_tar_source_and_dest_folder(params.stack_name, tarball_prefix) + if not source_file or not destination_folder: + Logger.warning("Could not retrieve properties for tarball with prefix: %s" % str(tarball_prefix)) return 1 # Ubuntu returns: "stdin: is not a tty", as subprocess output. @@ -185,8 +207,13 @@ def copy_tarballs_to_hdfs(tarball_prefix, hdp_select_component_name, component_u Logger.error("Could not parse HDP version from output of hdp-select: %s" % str(out)) return 1 - file_name = os.path.basename(component_tar_source_file) - destination_file = os.path.join(component_tar_destination_folder, file_name) + source_file = source_file.replace("{{ hdp_stack_version }}", hdp_version) + if not os.path.exists(source_file): + Logger.warning("Could not find file: %s" % str(source_file)) + return 1 + + file_name = os.path.basename(source_file) + destination_file = os.path.join(destination_folder, file_name) destination_file = destination_file.replace("{{ hdp_stack_version }}", hdp_version) does_hdfs_file_exist_cmd = "fs -ls %s" % destination_file @@ -214,6 +241,6 @@ def copy_tarballs_to_hdfs(tarball_prefix, hdp_select_component_name, component_u pass if not does_hdfs_file_exist: - source_and_dest_pairs = [(component_tar_source_file, destination_file), ] - return _copy_files(source_and_dest_pairs, component_user, file_owner, group_owner, kinit_if_needed) + source_and_dest_pairs = [(source_file, destination_file), ] + return _copy_files(source_and_dest_pairs, file_owner, group_owner, kinit_if_needed) return 1