AMBARI-20323. Commands timed-out on ambari host without any error logs - addendum patch (echekanskiy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b69ac43a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b69ac43a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b69ac43a Branch: refs/heads/branch-feature-AMBARI-12556 Commit: b69ac43a6a7aa5c7810bca0bc1204e6641634c35 Parents: 0471b0c Author: Eugene Chekanskiy <echekans...@hortonworks.com> Authored: Sat Mar 11 00:08:27 2017 +0200 Committer: Eugene Chekanskiy <echekans...@hortonworks.com> Committed: Sat Mar 11 00:08:27 2017 +0200 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/Controller.py | 2 +- .../src/main/python/ambari_agent/ExitHelper.py | 3 ++ .../ambari_agent/StatusCommandsExecutor.py | 36 ++++++++++++++++---- .../src/main/python/ambari_agent/main.py | 4 +-- 4 files changed, 35 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index c152f64..c1a5f1b 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -477,7 +477,7 @@ class Controller(threading.Thread): try: self.actionQueue = ActionQueue(self.config, controller=self) self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) - ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING") + ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING", can_relaunch=False) self.actionQueue.start() self.register = Register(self.config) self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector()) http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py index e51646f..66e29e6 100644 --- a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py +++ b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py @@ -39,6 +39,9 @@ class ExitHelper(object): """ Class to cleanup resources before exiting. Replacement for atexit module. sys.exit(code) works only from threads and os._exit(code) will ignore atexit and cleanup will be ignored. + + WARNING: always import as `ambari_agent.ExitHelper import ExitHelper`, otherwise it will be imported twice and nothing + will work as expected. """ __metaclass__ = _singleton http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py index 3f7ef4c..5c1c54a 100644 --- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py @@ -37,6 +37,9 @@ class StatusCommandsExecutor(object): self.config = config self.actionQueue = actionQueue + self._can_relaunch_lock = threading.RLock() + self._can_relaunch = True + # used to prevent queues from been used during creation of new one to prevent threads messing up with combination of # old and new queues self.usage_lock = threading.RLock() @@ -53,6 +56,16 @@ class StatusCommandsExecutor(object): self.mp_result_logs = multiprocessing.Queue() self.mp_task_queue = multiprocessing.Queue() + @property + def can_relaunch(self): + with self._can_relaunch_lock: + return self._can_relaunch + + @can_relaunch.setter + def can_relaunch(self, value): + with self._can_relaunch_lock: + self._can_relaunch = value + def _log_message(self, level, message, exception=None): """ Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target). @@ -163,7 +176,7 @@ class StatusCommandsExecutor(object): self._log_message(logging.ERROR, "StatusCommandsExecutor process failed with exception:", e) raise - self._log_message(logging.WARN, "StatusCommandsExecutor subprocess finished") + self._log_message(logging.INFO, "StatusCommandsExecutor subprocess finished") def _set_timed_out(self, command): """ @@ -242,23 +255,32 @@ class StatusCommandsExecutor(object): :param reason: reason of restart :return: """ - self.kill(reason) - self.worker_process = multiprocessing.Process(target=self._worker_process_target) - self.worker_process.start() - logger.info("Started process with pid {0}".format(self.worker_process.pid)) + if self.can_relaunch: + self.kill(reason) + self.worker_process = multiprocessing.Process(target=self._worker_process_target) + self.worker_process.start() + logger.info("Started process with pid {0}".format(self.worker_process.pid)) + else: + logger.debug("Relaunch does not allowed, can not relaunch") - def kill(self, reason=None): + def kill(self, reason=None, can_relaunch=True): """ Tries to stop command executor internal process for sort time, otherwise killing it. Closing all possible queues to unblock threads that probably blocked on read or write operations to queues. Must be called from threads different from threads that calling read or write methods(get_log_messages, get_results, put_commands). + :param can_relaunch: indicates if StatusCommandsExecutor can be relaunched after this kill :param reason: reason of killing :return: """ + logger.info("Killing child process reason:" + str(reason)) + self.can_relaunch = can_relaunch + + if not self.can_relaunch: + logger.info("Killing without possibility to relaunch...") + # try graceful stop, otherwise hard-kill if self.worker_process and self.worker_process.is_alive(): - logger.info("Killing child process reason:" + str(reason)) self.mustDieEvent.set() self.worker_process.join(timeout=3) if self.worker_process.is_alive(): http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index ade9e4f..ddef473 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -89,7 +89,7 @@ from NetUtil import NetUtil from PingPortListener import PingPortListener import hostname from DataCleaner import DataCleaner -from ExitHelper import ExitHelper +from ambari_agent.ExitHelper import ExitHelper import socket from ambari_commons import OSConst, OSCheck from ambari_commons.shell import shellRunner @@ -336,7 +336,7 @@ def run_threads(server_hostname, heartbeat_stop_callback): if controller.get_status_commands_executor().need_relaunch: controller.get_status_commands_executor().relaunch("COMMAND_TIMEOUT_OR_KILLED") - controller.get_status_commands_executor().kill() + controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False) # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process # we need this for windows os, where no sigterm available