AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene 
Chekanskiy via smohanty)


Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 5ef0c99a9d477b63f4e7213d058c9ab2d3ac2feb
Parents: ef34cb4
Author: Sumit Mohanty <>
Authored: Wed Apr 12 12:35:15 2017 -0700
Committer: Sumit Mohanty <>
Committed: Wed Apr 12 12:35:15 2017 -0700

 .../ambari_agent/      | 279 +------------------
 1 file changed, 2 insertions(+), 277 deletions(-)
diff --git 
index 142e7ca..f42e134 100644
--- a/ambari-agent/src/main/python/ambari_agent/
+++ b/ambari-agent/src/main/python/ambari_agent/
@@ -83,280 +83,5 @@ class 
   def kill(self, reason=None, can_relaunch=True):
-class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
-  def __init__(self, config, actionQueue):
-    self.config = config
-    self.actionQueue = actionQueue
-    self.can_relaunch = True
-    # used to prevent queues from been used during creation of new one to 
prevent threads messing up with combination of
-    # old and new queues
-    self.usage_lock = threading.RLock()
-    # protects against simultaneous killing/creating from different threads.
-    self.kill_lock = threading.RLock()
-    self.status_command_timeout = int(self.config.get('agent', 
'status_command_timeout', 5))
-    self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
-    self.worker_process = None
-    self.mustDieEvent = multiprocessing.Event()
-    self.timedOutEvent = multiprocessing.Event()
-    # multiprocessing stuff that need to be cleaned every time
-    self.mp_result_queue = multiprocessing.Queue()
-    self.mp_result_logs = multiprocessing.Queue()
-    self.mp_task_queue = multiprocessing.Queue()
-  def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, 
-    """
-    Read everything that available in queue. Using not reliable 
multiprocessing.Queue methods(qsize, empty), so contains
-    extremely dumb protection against blocking too much at this method: will 
try to get all possible items for not more
-    than ``max_time`` seconds; will return after ``max_empty_count`` calls of 
``target_queue.get(False)`` that raised
-    ``Queue.Empty`` exception. Notice ``read_break`` argument, with default 
values this method will be able to read
-    ~4500 ``range(1,10000)`` objects for 5 seconds. So don't fill queue too 
-    :param target_queue: queue to read from
-    :param max_time: maximum time to spend in this method call
-    :param max_empty_count: maximum allowed ``Queue.Empty`` in a row
-    :param read_break: time to wait before next read cycle iteration
-    :return: list of resulting objects
-    """
-    results = []
-    _empty = 0
-    _start = time.time()
-    with self.usage_lock:
-      try:
-        while (not target_queue.empty() or target_queue.qsize() > 0) and 
time.time() - _start < max_time and _empty < max_empty_count:
-          try:
-            results.append(target_queue.get(False))
-            _empty = 0
-            time.sleep(read_break) # sleep a little to get more accurate empty 
and qsize results
-          except Queue.Empty:
-            _empty += 1
-          except IOError:
-            pass
-          except UnicodeDecodeError:
-            pass
-      except IOError:
-        pass
-    return results
-  def _log_message(self, level, message, exception=None):
-    """
-    Put log message to logging queue. Must be used only for logging from child 
process(in _worker_process_target).
-    :param level:
-    :param message:
-    :param exception:
-    :return:
-    """
-    result_message = "StatusCommandExecutor reporting at {0}: 
".format(time.time()) + message
-    self.mp_result_logs.put((level, result_message, exception))
-  def _process_logs(self):
-    """
-    Get all available at this moment logs and prints them to logger.
-    """
-    for level, message, exception in self._drain_queue(self.mp_result_logs):
-      if level == logging.ERROR:
-        logger.debug(message, exc_info=exception)
-      if level == logging.WARN:
-        logger.warn(message)
-      if level == logging.INFO:
-  def _worker_process_target(self):
-    """
-    Internal method that running in separate process.
-    """
-    # cleanup monkey-patching results in child process, as it causing problems
-    import subprocess
-    reload(subprocess)
-    import multiprocessing
-    reload(multiprocessing)
-    bind_debug_signal_handlers()
-    self._log_message(logging.INFO, "StatusCommandsExecutor process started")
-    # region StatusCommandsExecutor process internals
-    internal_in_queue = Queue.Queue()
-    internal_out_queue = Queue.Queue()
-    def _internal_worker():
-      """
-      thread that actually executes status commands
-      """
-      while True:
-        _cmd = internal_in_queue.get()
-    worker = threading.Thread(target=_internal_worker)
-    worker.daemon = True
-    worker.start()
-    def _internal_process_command(_command):
-      internal_in_queue.put(_command)
-      start_time = time.time()
-      result = None
-      while not self.mustDieEvent.is_set() and not result and time.time() - 
start_time < self.status_command_timeout:
-        try:
-          result = internal_out_queue.get(timeout=1)
-        except Queue.Empty:
-          pass
-      if result:
-        self.mp_result_queue.put(result)
-        return True
-      else:
-        # do not set timed out event twice
-        if not self.timedOutEvent.is_set():
-          self._set_timed_out(_command)
-        return False
-    # endregion
-    try:
-      while not self.mustDieEvent.is_set():
-        try:
-          command = self.mp_task_queue.get(False)
-        except Queue.Empty:
-          # no command, lets try in other loop iteration
-          time.sleep(.1)
-          continue
-        self._log_message(logging.DEBUG, "Running status command for 
-        if _internal_process_command(command):
-          self._log_message(logging.DEBUG, "Completed status command for 
-    except Exception as e:
-      self._log_message(logging.ERROR, "StatusCommandsExecutor process failed 
with exception:", e)
-      raise
-    self._log_message(logging.INFO, "StatusCommandsExecutor subprocess 
-  def _set_timed_out(self, command):
-    """
-    Set timeout event and adding log entry for given command.
-    :param command:
-    :return:
-    """
-    msg = "Command {0} for {1} is running for more than {2} seconds. 
Terminating it due to timeout.".format(
-        command['commandType'],
-        command['componentName'],
-        self.status_command_timeout
-    )
-    self._log_message(logging.WARN, msg)
-    self.timedOutEvent.set()
-  def put_commands(self, commands):
-    """
-    Put given commands to command executor.
-    :param commands: status commands to execute
-    :return:
-    """
-    with self.usage_lock:
-      for command in commands:
-"Adding " + command['commandType'] + " for component " + \
-                    command['componentName'] + " of service " + \
-                    command['serviceName'] + " of cluster " + \
-                    command['clusterName'] + " to the queue.")
-        self.mp_task_queue.put(command)
-        logger.debug(pprint.pformat(command))
-  def process_results(self):
-    """
-    Process all the results from the SCE worker process.
-    """
-    self._process_logs()
-    results = self._drain_queue(self.mp_result_queue)
-    logger.debug("Drained %s status commands results, ~%s remains in queue", 
len(results), self.mp_result_queue.qsize())
-    for result in results:
-      try:
-        self.actionQueue.process_status_command_result(result)
-      except UnicodeDecodeError:
-        pass
-  @property
-  def need_relaunch(self):
-    """
-    Indicates if process need to be relaunched due to timeout or it is dead or 
even was not created.
-    :return: tuple (bool, str|None) with flag to relaunch and reason of 
-    """
-    if not self.worker_process or not self.worker_process.is_alive():
-      return True, "WORKER_DEAD"
-    elif self.timedOutEvent.is_set():
-      return True, "COMMAND_TIMEOUT"
-    return False, None
-  def relaunch(self, reason=None):
-    """
-    Restart status command executor internal process.
-    :param reason: reason of restart
-    :return:
-    """
-    with self.kill_lock:
-"Relaunching child process reason:" + str(reason))
-      if self.can_relaunch:
-        self.kill(reason)
-        self.worker_process = 
-        self.worker_process.start()
-"Started process with pid 
-      else:
-        logger.debug("Relaunch does not allowed, can not relaunch")
-  def kill(self, reason=None, can_relaunch=True):
-    """
-    Tries to stop command executor internal process for sort time, otherwise 
killing it. Closing all possible queues to
-    unblock threads that probably blocked on read or write operations to 
queues. Must be called from threads different
-    from threads that calling read or write methods(get_log_messages, 
get_results, put_commands).
-    :param can_relaunch: indicates if StatusCommandsExecutor can be relaunched 
after this kill
-    :param reason: reason of killing
-    :return:
-    """
-    with self.kill_lock:
-      self.can_relaunch = can_relaunch
-      if not self.can_relaunch:
-"Killing without possibility to relaunch...")
-      # try graceful stop, otherwise hard-kill
-      if self.worker_process and self.worker_process.is_alive():
-        self.mustDieEvent.set()
-        self.worker_process.join(timeout=3)
-        if self.worker_process.is_alive():
-          os.kill(, signal.SIGKILL)
-"Child process killed by -9")
-        else:
-          # get log messages only if we died gracefully, otherwise we will 
have chance to block here forever, in most cases
-          # this call will do nothing, as all logs will be processed in 
ActionQueue loop
-          self._process_logs()
-"Child process died gracefully")
-      else:
-"Child process already dead")
-      # close queues and acquire usage lock
-      # closing both sides of pipes here, we need this hack in case of 
blocking on recv() call
-      self.mp_result_queue.close()
-      self.mp_result_queue._writer.close()
-      self.mp_result_logs.close()
-      self.mp_result_logs._writer.close()
-      self.mp_task_queue.close()
-      self.mp_task_queue._writer.close()
-      with self.usage_lock:
-        self.mp_result_queue.join_thread()
-        self.mp_result_queue = multiprocessing.Queue()
-        self.mp_task_queue.join_thread()
-        self.mp_task_queue = multiprocessing.Queue()
-        self.mp_result_logs.join_thread()
-        self.mp_result_logs = multiprocessing.Queue()
-        self.customServiceOrchestrator = 
-        self.mustDieEvent.clear()
-        self.timedOutEvent.clear()
+# TODO make reliable MultiProcessStatusCommandsExecutor implementation
+MultiProcessStatusCommandsExecutor = SingleProcessStatusCommandsExecutor

