AMBARI-18629. HDFS goes down after installing cluster (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d3a3dd5c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d3a3dd5c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d3a3dd5c Branch: refs/heads/branch-2.5 Commit: d3a3dd5c26dfe2b70c228467e11df05a3d8d5558 Parents: be3a65d Author: Andrew Onishuk <aonis...@hortonworks.com> Authored: Tue Oct 18 18:35:20 2016 +0300 Committer: Andrew Onishuk <aonis...@hortonworks.com> Committed: Tue Oct 18 18:35:20 2016 +0300 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 26 +++++------- .../main/python/ambari_commons/thread_utils.py | 43 ++++++++++++++++++++ 2 files changed, 53 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/d3a3dd5c/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index c03ee4f..5962d94 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -36,6 +36,7 @@ from CommandStatusDict import CommandStatusDict from CustomServiceOrchestrator import CustomServiceOrchestrator from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle from ambari_commons.str_utils import split_on_chunks +from ambari_commons.thread_utils import terminate_thread logger = logging.getLogger() @@ -83,7 +84,6 @@ class ActionQueue(threading.Thread): self.controller = controller self.configTags = {} self._stop = threading.Event() - self.hangingStatusCommands = {} self.tmpdir = config.get('agent', 'prefix') self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller) self.parallel_execution = config.get_parallel_exec_option() @@ -230,22 +230,16 @@ class ActionQueue(threading.Thread): elif commandType == self.STATUS_COMMAND: component_name = command['componentName'] - if component_name in self.hangingStatusCommands and not self.hangingStatusCommands[component_name].isAlive(): - del self.hangingStatusCommands[component_name] + thread = threading.Thread(target = self.execute_status_command, args = (command,)) + thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping + thread.start() + thread.join(timeout=self.status_command_timeout) - if not component_name in self.hangingStatusCommands: - thread = threading.Thread(target = self.execute_status_command, args = (command,)) - thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping - thread.start() - thread.join(timeout=self.status_command_timeout) - - if thread.isAlive(): - # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent. - PythonReflectiveExecutor.last_context.revert() - logger.warn("Command {0} for {1} is running for more than {2} seconds. Skipping it for current pack of status commands.".format(commandType, component_name, self.status_command_timeout)) - self.hangingStatusCommands[component_name] = thread - else: - logger.info("Not running {0} for {1}, because previous one is still running.".format(commandType, component_name)) + if thread.isAlive(): + terminate_thread(thread) + # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent. + PythonReflectiveExecutor.last_context.revert() + logger.warn("Command {0} for {1} was running for more than {2} seconds. Terminated due to timeout.".format(commandType, component_name, self.status_command_timeout)) else: logger.error("Unrecognized command " + pprint.pformat(command)) except Exception: http://git-wip-us.apache.org/repos/asf/ambari/blob/d3a3dd5c/ambari-common/src/main/python/ambari_commons/thread_utils.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_commons/thread_utils.py b/ambari-common/src/main/python/ambari_commons/thread_utils.py new file mode 100644 index 0000000..952022c --- /dev/null +++ b/ambari-common/src/main/python/ambari_commons/thread_utils.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +def terminate_thread(thread): + """Terminates a python thread abruptly from another thread. + + This is consider a bad pattern to do this. + If possible, please consider handling stopping of the thread from inside of it + or creating thread as a separate process (multiprocessing module). + + :param thread: a threading.Thread instance + """ + import ctypes + if not thread.isAlive(): + return + + exc = ctypes.py_object(SystemExit) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(thread.ident), exc) + if res == 0: + raise ValueError("nonexistent thread id") + elif res > 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None) + raise SystemError("PyThreadState_SetAsyncExc failed") \ No newline at end of file