AMBARI-20632. With multi-process StatusCommandsExecutor, Status commands are taking too long to report back (echekanskiy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f8827fea Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f8827fea Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f8827fea Branch: refs/heads/branch-feature-AMBARI-12556 Commit: f8827fea405482289a8849ebe2a7dcf2fee11294 Parents: f389293 Author: Eugene Chekanskiy <[email protected]> Authored: Mon Apr 3 16:46:27 2017 +0300 Committer: Eugene Chekanskiy <[email protected]> Committed: Mon Apr 3 16:46:27 2017 +0300 ---------------------------------------------------------------------- .../ambari_agent/StatusCommandsExecutor.py | 209 +++++++++---------- .../src/main/python/ambari_agent/main.py | 5 +- 2 files changed, 98 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f8827fea/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py index 04a3e85..142e7ca 100644 --- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py @@ -49,7 +49,7 @@ class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor): self.config = config self.actionQueue = actionQueue self.statusCommandQueue = Queue.Queue() - self.need_relaunch = False + self.need_relaunch = (False, None) # tuple (bool, str|None) with flag to relaunch and reason of relaunch def put_commands(self, commands): with self.statusCommandQueue.mutex: @@ -88,12 +88,13 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor): self.config = config self.actionQueue = actionQueue - self._can_relaunch_lock = threading.RLock() - self._can_relaunch = True + self.can_relaunch = True # used to prevent queues from been used during creation of new one to prevent threads messing up with combination of # old and new queues self.usage_lock = threading.RLock() + # protects against simultaneous killing/creating from different threads. + self.kill_lock = threading.RLock() self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5)) self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator @@ -107,42 +108,32 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor): self.mp_result_logs = multiprocessing.Queue() self.mp_task_queue = multiprocessing.Queue() - @property - def can_relaunch(self): - with self._can_relaunch_lock: - return self._can_relaunch - - @can_relaunch.setter - def can_relaunch(self, value): - with self._can_relaunch_lock: - self._can_relaunch = value - - def _log_message(self, level, message, exception=None): - """ - Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target). - - :param level: - :param message: - :param exception: - :return: + def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, read_break=.001): """ - result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message - self.mp_result_logs.put((level, result_message, exception)) - - def _get_log_messages(self): - """ - Returns list of (level, message, exception) log messages. - - :return: list of (level, message, exception) + Read everything that available in queue. Using not reliable multiprocessing.Queue methods(qsize, empty), so contains + extremely dumb protection against blocking too much at this method: will try to get all possible items for not more + than ``max_time`` seconds; will return after ``max_empty_count`` calls of ``target_queue.get(False)`` that raised + ``Queue.Empty`` exception. Notice ``read_break`` argument, with default values this method will be able to read + ~4500 ``range(1,10000)`` objects for 5 seconds. So don't fill queue too fast. + + :param target_queue: queue to read from + :param max_time: maximum time to spend in this method call + :param max_empty_count: maximum allowed ``Queue.Empty`` in a row + :param read_break: time to wait before next read cycle iteration + :return: list of resulting objects """ results = [] + _empty = 0 + _start = time.time() with self.usage_lock: try: - while not self.mp_result_logs.empty(): + while (not target_queue.empty() or target_queue.qsize() > 0) and time.time() - _start < max_time and _empty < max_empty_count: try: - results.append(self.mp_result_logs.get(False)) + results.append(target_queue.get(False)) + _empty = 0 + time.sleep(read_break) # sleep a little to get more accurate empty and qsize results except Queue.Empty: - pass + _empty += 1 except IOError: pass except UnicodeDecodeError: @@ -151,11 +142,23 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor): pass return results + def _log_message(self, level, message, exception=None): + """ + Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target). + + :param level: + :param message: + :param exception: + :return: + """ + result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message + self.mp_result_logs.put((level, result_message, exception)) + def _process_logs(self): """ Get all available at this moment logs and prints them to logger. """ - for level, message, exception in self._get_log_messages(): + for level, message, exception in self._drain_queue(self.mp_result_logs): if level == logging.ERROR: logger.debug(message, exc_info=exception) if level == logging.WARN: @@ -256,16 +259,6 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor): :return: """ with self.usage_lock: - if not self.mp_task_queue.empty(): - status_command_queue_size = 0 - try: - while not self.mp_task_queue.empty(): - self.mp_task_queue.get(False) - status_command_queue_size += 1 - except Queue.Empty: - pass - - logger.info("Number of status commands removed from queue : " + str(status_command_queue_size)) for command in commands: logger.info("Adding " + command['commandType'] + " for component " + \ command['componentName'] + " of service " + \ @@ -276,43 +269,29 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor): def process_results(self): """ - Process all the results from the internal worker + Process all the results from the SCE worker process. """ self._process_logs() - for result in self._get_results(): + results = self._drain_queue(self.mp_result_queue) + logger.debug("Drained %s status commands results, ~%s remains in queue", len(results), self.mp_result_queue.qsize()) + for result in results: try: self.actionQueue.process_status_command_result(result) except UnicodeDecodeError: pass - def _get_results(self): - """ - Get all available results for status commands. - - :return: list of results - """ - results = [] - with self.usage_lock: - try: - while not self.mp_result_queue.empty(): - try: - results.append(self.mp_result_queue.get(False)) - except Queue.Empty: - pass - except IOError: - pass - except UnicodeDecodeError: - pass - except IOError: - pass - return results - @property def need_relaunch(self): """ Indicates if process need to be relaunched due to timeout or it is dead or even was not created. + + :return: tuple (bool, str|None) with flag to relaunch and reason of relaunch """ - return self.timedOutEvent.is_set() or not self.worker_process or not self.worker_process.is_alive() + if not self.worker_process or not self.worker_process.is_alive(): + return True, "WORKER_DEAD" + elif self.timedOutEvent.is_set(): + return True, "COMMAND_TIMEOUT" + return False, None def relaunch(self, reason=None): """ @@ -321,13 +300,15 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor): :param reason: reason of restart :return: """ - if self.can_relaunch: - self.kill(reason) - self.worker_process = multiprocessing.Process(target=self._worker_process_target) - self.worker_process.start() - logger.info("Started process with pid {0}".format(self.worker_process.pid)) - else: - logger.debug("Relaunch does not allowed, can not relaunch") + with self.kill_lock: + logger.info("Relaunching child process reason:" + str(reason)) + if self.can_relaunch: + self.kill(reason) + self.worker_process = multiprocessing.Process(target=self._worker_process_target) + self.worker_process.start() + logger.info("Started process with pid {0}".format(self.worker_process.pid)) + else: + logger.debug("Relaunch does not allowed, can not relaunch") def kill(self, reason=None, can_relaunch=True): """ @@ -339,43 +320,43 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor): :param reason: reason of killing :return: """ - logger.info("Killing child process reason:" + str(reason)) - self.can_relaunch = can_relaunch - - if not self.can_relaunch: - logger.info("Killing without possibility to relaunch...") - - # try graceful stop, otherwise hard-kill - if self.worker_process and self.worker_process.is_alive(): - self.mustDieEvent.set() - self.worker_process.join(timeout=3) - if self.worker_process.is_alive(): - os.kill(self.worker_process.pid, signal.SIGKILL) - logger.info("Child process killed by -9") + with self.kill_lock: + self.can_relaunch = can_relaunch + + if not self.can_relaunch: + logger.info("Killing without possibility to relaunch...") + + # try graceful stop, otherwise hard-kill + if self.worker_process and self.worker_process.is_alive(): + self.mustDieEvent.set() + self.worker_process.join(timeout=3) + if self.worker_process.is_alive(): + os.kill(self.worker_process.pid, signal.SIGKILL) + logger.info("Child process killed by -9") + else: + # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases + # this call will do nothing, as all logs will be processed in ActionQueue loop + self._process_logs() + logger.info("Child process died gracefully") else: - # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases - # this call will do nothing, as all logs will be processed in ActionQueue loop - self._process_logs() - logger.info("Child process died gracefully") - else: - logger.info("Child process already dead") - - # close queues and acquire usage lock - # closing both sides of pipes here, we need this hack in case of blocking on recv() call - self.mp_result_queue.close() - self.mp_result_queue._writer.close() - self.mp_result_logs.close() - self.mp_result_logs._writer.close() - self.mp_task_queue.close() - self.mp_task_queue._writer.close() - - with self.usage_lock: - self.mp_result_queue.join_thread() - self.mp_result_queue = multiprocessing.Queue() - self.mp_task_queue.join_thread() - self.mp_task_queue = multiprocessing.Queue() - self.mp_result_logs.join_thread() - self.mp_result_logs = multiprocessing.Queue() - self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator - self.mustDieEvent.clear() - self.timedOutEvent.clear() + logger.info("Child process already dead") + + # close queues and acquire usage lock + # closing both sides of pipes here, we need this hack in case of blocking on recv() call + self.mp_result_queue.close() + self.mp_result_queue._writer.close() + self.mp_result_logs.close() + self.mp_result_logs._writer.close() + self.mp_task_queue.close() + self.mp_task_queue._writer.close() + + with self.usage_lock: + self.mp_result_queue.join_thread() + self.mp_result_queue = multiprocessing.Queue() + self.mp_task_queue.join_thread() + self.mp_task_queue = multiprocessing.Queue() + self.mp_result_logs.join_thread() + self.mp_result_logs = multiprocessing.Queue() + self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator + self.mustDieEvent.clear() + self.timedOutEvent.clear() http://git-wip-us.apache.org/repos/asf/ambari/blob/f8827fea/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 923c570..19c92b0 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -352,8 +352,9 @@ def run_threads(server_hostname, heartbeat_stop_callback): while controller.is_alive(): time.sleep(0.1) - if controller.get_status_commands_executor().need_relaunch: - controller.get_status_commands_executor().relaunch("COMMAND_TIMEOUT_OR_KILLED") + need_relaunch, reason = controller.get_status_commands_executor().need_relaunch + if need_relaunch: + controller.get_status_commands_executor().relaunch(reason) controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False)
