AMBARI-21544. HiveServer2 fails to start with webhdfs call to create /hdp/apps/..jar files fails with org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cfd7bb4c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cfd7bb4c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cfd7bb4c Branch: refs/heads/branch-feature-AMBARI-14714 Commit: cfd7bb4cbe4b0af23d92dc64b81ec1cdedc4241d Parents: da21afc Author: Andrew Onishuk <aonis...@hortonworks.com> Authored: Fri Jul 21 12:39:01 2017 +0300 Committer: Andrew Onishuk <aonis...@hortonworks.com> Committed: Fri Jul 21 12:39:01 2017 +0300 ---------------------------------------------------------------------- .../libraries/providers/hdfs_resource.py | 48 +++++++++++++++++++- 1 file changed, 46 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/cfd7bb4c/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py index efca23d..0c45719 100644 --- a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py +++ b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py @@ -61,6 +61,11 @@ RESOURCE_TO_JSON_FIELDS = { 'dfs_type': 'dfs_type' } +EXCEPTIONS_TO_RETRY = { + # "ExceptionName": (try_count, try_sleep_seconds) + "LeaseExpiredException": (20, 6), +} + class HdfsResourceJar: """ This is slower than HdfsResourceWebHDFS implementation of HdfsResouce, but it works in any cases on any DFS types. @@ -132,6 +137,17 @@ class HdfsResourceJar: # Clean env.config['hdfs_files'] = [] + +class WebHDFSCallException(Fail): + def __init__(self, message, result_message): + self.result_message = result_message + super(WebHDFSCallException, self).__init__(message) + + def get_exception_name(self): + if isinstance(self.result_message, dict) and "RemoteException" in self.result_message and "exception" in self.result_message["RemoteException"]: + return self.result_message["RemoteException"]["exception"] + return None + class WebHDFSUtil: def __init__(self, hdfs_site, run_user, security_enabled, logoutput=None): https_nn_address = namenode_ha_utils.get_property_for_active_namenode(hdfs_site, 'dfs.namenode.https-address', @@ -153,8 +169,36 @@ class WebHDFSUtil: # only hdfs seems to support webHDFS return (is_webhdfs_enabled and default_fs.startswith("hdfs")) + def run_command(self, *args, **kwargs): + """ + This functions is a wrapper for self._run_command which does retry routine for it. + """ + try: + return self._run_command(*args, **kwargs) + except WebHDFSCallException as ex: + exception_name = ex.get_exception_name() + if exception_name in EXCEPTIONS_TO_RETRY: + try_count, try_sleep = EXCEPTIONS_TO_RETRY[exception_name] + last_exception = ex + else: + raise + + while True: + Logger.info("Retrying after {0} seconds. Reason: {1}".format(try_sleep, str(last_exception))) + try_count -= 1 + time.sleep(try_sleep) + + if try_count == 0: + break + + try: + self._run_command(*args, **kwargs) + break + except WebHDFSCallException as ex: + last_exception = ex + valid_status_codes = ["200", "201"] - def run_command(self, target, operation, method='POST', assertable_result=True, file_to_put=None, ignore_status_codes=[], **kwargs): + def _run_command(self, target, operation, method='POST', assertable_result=True, file_to_put=None, ignore_status_codes=[], **kwargs): """ assertable_result - some POST requests return '{"boolean":false}' or '{"boolean":true}' depending on if query was successful or not, we can assert this for them @@ -201,7 +245,7 @@ class WebHDFSUtil: formatted_output = json.dumps(result_dict, indent=2) if isinstance(result_dict, dict) else result_dict formatted_output = err + "\n" + formatted_output err_msg = "Execution of '%s' returned status_code=%s. %s" % (shell.string_cmd_from_args_list(cmd), status_code, formatted_output) - raise Fail(err_msg) + raise WebHDFSCallException(err_msg, result_dict) return result_dict