[04/34] ambari git commit: AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene Chekanskiy via smohanty)
AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene Chekanskiy via smohanty) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5ef0c99a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5ef0c99a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5ef0c99a Branch: refs/heads/branch-feature-AMBARI-12556 Commit: 5ef0c99a9d477b63f4e7213d058c9ab2d3ac2feb Parents: ef34cb4 Author: Sumit Mohanty Authored: Wed Apr 12 12:35:15 2017 -0700 Committer: Sumit Mohanty Committed: Wed Apr 12 12:35:15 2017 -0700 -- .../ambari_agent/StatusCommandsExecutor.py | 279 +-- 1 file changed, 2 insertions(+), 277 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ambari/blob/5ef0c99a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py -- diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py index 142e7ca..f42e134 100644 --- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py @@ -83,280 +83,5 @@ class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor): def kill(self, reason=None, can_relaunch=True): pass -class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor): - def __init__(self, config, actionQueue): -self.config = config -self.actionQueue = actionQueue - -self.can_relaunch = True - -# used to prevent queues from been used during creation of new one to prevent threads messing up with combination of -# old and new queues -self.usage_lock = threading.RLock() -# protects against simultaneous killing/creating from different threads. -self.kill_lock = threading.RLock() - -self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5)) -self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator - -self.worker_process = None -self.mustDieEvent = multiprocessing.Event() -self.timedOutEvent = multiprocessing.Event() - -# multiprocessing stuff that need to be cleaned every time -self.mp_result_queue = multiprocessing.Queue() -self.mp_result_logs = multiprocessing.Queue() -self.mp_task_queue = multiprocessing.Queue() - - def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, read_break=.001): -""" -Read everything that available in queue. Using not reliable multiprocessing.Queue methods(qsize, empty), so contains -extremely dumb protection against blocking too much at this method: will try to get all possible items for not more -than ``max_time`` seconds; will return after ``max_empty_count`` calls of ``target_queue.get(False)`` that raised -``Queue.Empty`` exception. Notice ``read_break`` argument, with default values this method will be able to read -~4500 ``range(1,1)`` objects for 5 seconds. So don't fill queue too fast. - -:param target_queue: queue to read from -:param max_time: maximum time to spend in this method call -:param max_empty_count: maximum allowed ``Queue.Empty`` in a row -:param read_break: time to wait before next read cycle iteration -:return: list of resulting objects -""" -results = [] -_empty = 0 -_start = time.time() -with self.usage_lock: - try: -while (not target_queue.empty() or target_queue.qsize() > 0) and time.time() - _start < max_time and _empty < max_empty_count: - try: -results.append(target_queue.get(False)) -_empty = 0 -time.sleep(read_break) # sleep a little to get more accurate empty and qsize results - except Queue.Empty: -_empty += 1 - except IOError: -pass - except UnicodeDecodeError: -pass - except IOError: -pass -return results - - def _log_message(self, level, message, exception=None): -""" -Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target). - -:param level: -:param message: -:param exception: -:return: -""" -result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message -self.mp_result_logs.put((level, result_message, exception)) - - def _process_logs(self): -""" -Get all available at this moment logs and prints them to logger. -""" -for level, message, exception in self._drain_queue(self.mp_result_logs): - if level == logging.ERROR: -logger.debug(message, exc_info=exception) - if level == logging.WARN: -logger.warn(message) -
ambari git commit: AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene Chekanskiy via smohanty)
Repository: ambari Updated Branches: refs/heads/branch-2.5 05b429714 -> e5940693b AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene Chekanskiy via smohanty) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e5940693 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e5940693 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e5940693 Branch: refs/heads/branch-2.5 Commit: e5940693b968ba2cc0e84af3cce4a80ad013cfef Parents: 05b4297 Author: Sumit Mohanty Authored: Wed Apr 12 12:35:15 2017 -0700 Committer: Sumit Mohanty Committed: Wed Apr 12 12:38:00 2017 -0700 -- .../ambari_agent/StatusCommandsExecutor.py | 278 +-- 1 file changed, 1 insertion(+), 277 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ambari/blob/e5940693/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py -- diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py index 5a8e4ce..b2a65ff 100644 --- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py @@ -83,280 +83,4 @@ class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor): def kill(self, reason=None, can_relaunch=True): pass -class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor): - def __init__(self, config, actionQueue): -self.config = config -self.actionQueue = actionQueue - -self.can_relaunch = True - -# used to prevent queues from been used during creation of new one to prevent threads messing up with combination of -# old and new queues -self.usage_lock = threading.RLock() -# protects against simultaneous killing/creating from different threads. -self.kill_lock = threading.RLock() - -self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5)) -self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator - -self.worker_process = None -self.mustDieEvent = multiprocessing.Event() -self.timedOutEvent = multiprocessing.Event() - -# multiprocessing stuff that need to be cleaned every time -self.mp_result_queue = multiprocessing.Queue() -self.mp_result_logs = multiprocessing.Queue() -self.mp_task_queue = multiprocessing.Queue() - - def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, read_break=.001): -""" -Read everything that available in queue. Using not reliable multiprocessing.Queue methods(qsize, empty), so contains -extremely dumb protection against blocking too much at this method: will try to get all possible items for not more -than ``max_time`` seconds; will return after ``max_empty_count`` calls of ``target_queue.get(False)`` that raised -``Queue.Empty`` exception. Notice ``read_break`` argument, with default values this method will be able to read -~4500 ``range(1,1)`` objects for 5 seconds. So don't fill queue too fast. - -:param target_queue: queue to read from -:param max_time: maximum time to spend in this method call -:param max_empty_count: maximum allowed ``Queue.Empty`` in a row -:param read_break: time to wait before next read cycle iteration -:return: list of resulting objects -""" -results = [] -_empty = 0 -_start = time.time() -with self.usage_lock: - try: -while (not target_queue.empty() or target_queue.qsize() > 0) and time.time() - _start < max_time and _empty < max_empty_count: - try: -results.append(target_queue.get(False)) -_empty = 0 -time.sleep(read_break) # sleep a little to get more accurate empty and qsize results - except Queue.Empty: -_empty += 1 - except IOError: -pass - except UnicodeDecodeError: -pass - except IOError: -pass -return results - - def _log_message(self, level, message, exception=None): -""" -Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target). - -:param level: -:param message: -:param exception: -:return: -""" -result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message -self.mp_result_logs.put((level, result_message, exception)) - - def _process_logs(self): -""" -Get all available at this moment logs and prints them to logger. -""" -for level, message, exception in self._drain_queue(self.mp_result_logs): - if level == logging.ERROR: -logger.debug(message, exc_info=excepti
ambari git commit: AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene Chekanskiy via smohanty)
Repository: ambari Updated Branches: refs/heads/trunk ef34cb4ee -> 5ef0c99a9 AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene Chekanskiy via smohanty) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5ef0c99a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5ef0c99a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5ef0c99a Branch: refs/heads/trunk Commit: 5ef0c99a9d477b63f4e7213d058c9ab2d3ac2feb Parents: ef34cb4 Author: Sumit Mohanty Authored: Wed Apr 12 12:35:15 2017 -0700 Committer: Sumit Mohanty Committed: Wed Apr 12 12:35:15 2017 -0700 -- .../ambari_agent/StatusCommandsExecutor.py | 279 +-- 1 file changed, 2 insertions(+), 277 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ambari/blob/5ef0c99a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py -- diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py index 142e7ca..f42e134 100644 --- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py @@ -83,280 +83,5 @@ class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor): def kill(self, reason=None, can_relaunch=True): pass -class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor): - def __init__(self, config, actionQueue): -self.config = config -self.actionQueue = actionQueue - -self.can_relaunch = True - -# used to prevent queues from been used during creation of new one to prevent threads messing up with combination of -# old and new queues -self.usage_lock = threading.RLock() -# protects against simultaneous killing/creating from different threads. -self.kill_lock = threading.RLock() - -self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5)) -self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator - -self.worker_process = None -self.mustDieEvent = multiprocessing.Event() -self.timedOutEvent = multiprocessing.Event() - -# multiprocessing stuff that need to be cleaned every time -self.mp_result_queue = multiprocessing.Queue() -self.mp_result_logs = multiprocessing.Queue() -self.mp_task_queue = multiprocessing.Queue() - - def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, read_break=.001): -""" -Read everything that available in queue. Using not reliable multiprocessing.Queue methods(qsize, empty), so contains -extremely dumb protection against blocking too much at this method: will try to get all possible items for not more -than ``max_time`` seconds; will return after ``max_empty_count`` calls of ``target_queue.get(False)`` that raised -``Queue.Empty`` exception. Notice ``read_break`` argument, with default values this method will be able to read -~4500 ``range(1,1)`` objects for 5 seconds. So don't fill queue too fast. - -:param target_queue: queue to read from -:param max_time: maximum time to spend in this method call -:param max_empty_count: maximum allowed ``Queue.Empty`` in a row -:param read_break: time to wait before next read cycle iteration -:return: list of resulting objects -""" -results = [] -_empty = 0 -_start = time.time() -with self.usage_lock: - try: -while (not target_queue.empty() or target_queue.qsize() > 0) and time.time() - _start < max_time and _empty < max_empty_count: - try: -results.append(target_queue.get(False)) -_empty = 0 -time.sleep(read_break) # sleep a little to get more accurate empty and qsize results - except Queue.Empty: -_empty += 1 - except IOError: -pass - except UnicodeDecodeError: -pass - except IOError: -pass -return results - - def _log_message(self, level, message, exception=None): -""" -Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target). - -:param level: -:param message: -:param exception: -:return: -""" -result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message -self.mp_result_logs.put((level, result_message, exception)) - - def _process_logs(self): -""" -Get all available at this moment logs and prints them to logger. -""" -for level, message, exception in self._drain_queue(self.mp_result_logs): - if level == logging.ERROR: -logger.debug(message, exc_info=exception) -