AMBARI-20419. Add a property to enable Status commands on separate process (non-default) (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/938fed1f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/938fed1f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/938fed1f Branch: refs/heads/branch-feature-AMBARI-12556 Commit: 938fed1f3b2b96ae8da174f9bd2133ca064197af Parents: 73a8633 Author: Andrew Onishuk <[email protected]> Authored: Mon Mar 13 16:32:30 2017 +0200 Committer: Andrew Onishuk <[email protected]> Committed: Mon Mar 13 16:32:30 2017 +0200 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 16 ++--- .../main/python/ambari_agent/AmbariConfig.py | 3 + .../src/main/python/ambari_agent/Controller.py | 7 +- .../ambari_agent/StatusCommandsExecutor.py | 75 +++++++++++++++++--- .../test/python/ambari_agent/TestActionQueue.py | 4 +- 5 files changed, 83 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/938fed1f/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 15ae03d..b1d5160 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -146,7 +146,7 @@ class ActionQueue(threading.Thread): try: while not self.stopped(): self.processBackgroundQueueSafeEmpty() - self.process_status_command_results() + self.controller.get_status_commands_executor().process_results() # process status commands try: if self.parallel_execution == 0: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) @@ -190,14 +190,6 @@ class ActionQueue(threading.Thread): except Queue.Empty: pass - def process_status_command_results(self): - self.controller.statusCommandsExecutor.process_logs() - for result in self.controller.statusCommandsExecutor.get_results(): - try: - self.process_status_command_result(result) - except UnicodeDecodeError: - pass - def createCommandHandle(self, command): if command.has_key('__handle'): raise AgentException("Command already has __handle") @@ -503,6 +495,12 @@ class ActionQueue(threading.Thread): self.commandStatuses.put_command_status(handle.command, roleResult) + def execute_status_command_and_security_status(self, command): + component_status_result = self.customServiceOrchestrator.requestComponentStatus(command) + component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(command) + + return command, component_status_result, component_security_status_result + def process_status_command_result(self, result): ''' Executes commands of type STATUS_COMMAND http://git-wip-us.apache.org/repos/asf/ambari/blob/938fed1f/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py index 64c2643..1965dc2 100644 --- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -304,6 +304,9 @@ class AmbariConfig: self.set('agent', self.ULIMIT_OPEN_FILES_KEY, value) + def get_multiprocess_status_commands_executor_enabled(self): + return bool(int(self.get('agent', 'multiprocess_status_commands_executor_enabled', 1))) + def update_configuration_from_registration(self, reg_resp): if reg_resp and AmbariConfig.AMBARI_PROPERTIES_CATEGORY in reg_resp: if not self.has_section(AmbariConfig.AMBARI_PROPERTIES_CATEGORY): http://git-wip-us.apache.org/repos/asf/ambari/blob/938fed1f/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index c1a5f1b..29a11aa 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -48,7 +48,7 @@ from ambari_agent.ClusterConfiguration import ClusterConfiguration from ambari_agent.RecoveryManager import RecoveryManager from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers from ambari_agent.ExitHelper import ExitHelper -from ambari_agent.StatusCommandsExecutor import StatusCommandsExecutor +from ambari_agent.StatusCommandsExecutor import MultiProcessStatusCommandsExecutor, SingleProcessStatusCommandsExecutor from resource_management.libraries.functions.version import compare_versions from ambari_commons.os_utils import get_used_ram @@ -476,7 +476,10 @@ class Controller(threading.Thread): def run(self): try: self.actionQueue = ActionQueue(self.config, controller=self) - self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) + if self.config.get_multiprocess_status_commands_executor_enabled(): + self.statusCommandsExecutor = MultiProcessStatusCommandsExecutor(self.config, self.actionQueue) + else: + self.statusCommandsExecutor = SingleProcessStatusCommandsExecutor(self.config, self.actionQueue) ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING", can_relaunch=False) self.actionQueue.start() self.register = Register(self.config) http://git-wip-us.apache.org/repos/asf/ambari/blob/938fed1f/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py index 5c1c54a..3b23f1c 100644 --- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py @@ -31,8 +31,56 @@ from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers logger = logging.getLogger(__name__) - class StatusCommandsExecutor(object): + def put_commands(self, commands): + raise NotImplemented() + + def process_results(self): + raise NotImplemented() + + def relaunch(self, reason=None): + raise NotImplemented() + + def kill(self, reason=None, can_relaunch=True): + raise NotImplemented() + +class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor): + def __init__(self, config, actionQueue): + self.config = config + self.actionQueue = actionQueue + self.statusCommandQueue = Queue.Queue() + self.need_relaunch = False + + def put_commands(self, commands): + while not self.statusCommandQueue.empty(): + self.statusCommandQueue.get() + + for command in commands: + logger.info("Adding " + command['commandType'] + " for component " + \ + command['componentName'] + " of service " + \ + command['serviceName'] + " of cluster " + \ + command['clusterName'] + " to the queue.") + self.statusCommandQueue.put(command) + logger.debug(pprint.pformat(command)) + + def process_results(self): + """ + Execute a single command from the queue and process it + """ + while not self.statusCommandQueue.empty(): + try: + command = self.statusCommandQueue.get(False) + self.actionQueue.process_status_command_result(self.actionQueue.execute_status_command_and_security_status(command)) + except Queue.Empty: + pass + + def relaunch(self, reason=None): + pass + + def kill(self, reason=None, can_relaunch=True): + pass + +class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor): def __init__(self, config, actionQueue): self.config = config self.actionQueue = actionQueue @@ -78,7 +126,7 @@ class StatusCommandsExecutor(object): result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message self.mp_result_logs.put((level, result_message, exception)) - def get_log_messages(self): + def _get_log_messages(self): """ Returns list of (level, message, exception) log messages. @@ -100,11 +148,11 @@ class StatusCommandsExecutor(object): pass return results - def process_logs(self): + def _process_logs(self): """ Get all available at this moment logs and prints them to logger. """ - for level, message, exception in self.get_log_messages(): + for level, message, exception in self._get_log_messages(): if level == logging.ERROR: logger.debug(message, exc_info=exception) if level == logging.WARN: @@ -129,9 +177,7 @@ class StatusCommandsExecutor(object): """ while True: _cmd = internal_in_queue.get() - component_status_result = self.customServiceOrchestrator.requestComponentStatus(_cmd) - component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(_cmd) - internal_out_queue.put((_cmd, component_status_result, component_security_status_result)) + internal_out_queue.put(self.actionQueue.execute_status_command_and_security_status(_cmd)) worker = threading.Thread(target=_internal_worker) worker.daemon = True @@ -219,7 +265,18 @@ class StatusCommandsExecutor(object): self.mp_task_queue.put(command) logger.debug(pprint.pformat(command)) - def get_results(self): + def process_results(self): + """ + Process all the results from the internal worker + """ + self._process_logs() + for result in self._get_results(): + try: + self.actionQueue.process_status_command_result(result) + except UnicodeDecodeError: + pass + + def _get_results(self): """ Get all available results for status commands. @@ -289,7 +346,7 @@ class StatusCommandsExecutor(object): else: # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases # this call will do nothing, as all logs will be processed in ActionQueue loop - self.process_logs() + self._process_logs() logger.info("Child process died gracefully") else: logger.info("Child process already dead") http://git-wip-us.apache.org/repos/asf/ambari/blob/938fed1f/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 8701a24..67f0833 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -1329,7 +1329,7 @@ class TestActionQueue(TestCase): execute_command = copy.deepcopy(self.background_command) actionQueue.put([execute_command]) actionQueue.processBackgroundQueueSafeEmpty(); - actionQueue.process_status_command_results(); + actionQueue.controller.statusCommandExecutor.process_results(); #assert that python execturor start self.assertTrue(runCommand_mock.called) @@ -1373,7 +1373,7 @@ class TestActionQueue(TestCase): None, command_complete_w) actionQueue.put([self.background_command]) actionQueue.processBackgroundQueueSafeEmpty(); - actionQueue.process_status_command_results(); + actionQueue.controller.statusCommandExecutor.process_results(); with lock: complete_done.wait(0.1)
