Repository: ambari Updated Branches: refs/heads/branch-2.5 40aeeb9c0 -> b512b26ae
AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f8ccb478 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f8ccb478 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f8ccb478 Branch: refs/heads/branch-2.5 Commit: f8ccb478902438c2d1bd43f182c8a7bd935b1088 Parents: 40aeeb9 Author: Andrew Onishuk <aonis...@hortonworks.com> Authored: Tue Nov 8 11:32:46 2016 +0200 Committer: Toader, Sebastian <stoa...@hortonworks.com> Committed: Wed Jan 18 09:41:54 2017 +0100 ---------------------------------------------------------------------- ambari-agent/conf/unix/ambari-agent.ini | 1 + .../src/main/python/ambari_agent/ActionQueue.py | 41 ++++----- .../src/main/python/ambari_agent/Controller.py | 10 +++ .../ambari_agent/PythonReflectiveExecutor.py | 20 +++-- .../ambari_agent/StatusCommandsExecutor.py | 91 ++++++++++++++++++++ .../src/main/python/ambari_agent/main.py | 21 ++++- .../test/python/ambari_agent/TestActionQueue.py | 76 ++++------------ .../test/python/ambari_agent/TestController.py | 1 + .../src/test/python/ambari_agent/TestMain.py | 3 +- 9 files changed, 175 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/conf/unix/ambari-agent.ini ---------------------------------------------------------------------- diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini index f2c8846..56fa605 100644 --- a/ambari-agent/conf/unix/ambari-agent.ini +++ b/ambari-agent/conf/unix/ambari-agent.ini @@ -35,6 +35,7 @@ tolerate_download_failures=true run_as_user=root parallel_execution=0 alert_grace_period=5 +status_command_timeout=5 alert_kinit_timeout=14400000 system_resource_overrides=/etc/resource_overrides ; memory_threshold_soft_mb=400 http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index a9567c4..aeae954 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -18,6 +18,7 @@ See the License for the specific language governing permissions and limitations under the License. ''' import Queue +import multiprocessing import logging import traceback @@ -74,7 +75,8 @@ class ActionQueue(threading.Thread): def __init__(self, config, controller): super(ActionQueue, self).__init__() self.commandQueue = Queue.Queue() - self.statusCommandQueue = Queue.Queue() + self.statusCommandQueue = multiprocessing.Queue() + self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor. self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = CommandStatusDict(callback_action = self.status_update_callback) @@ -96,13 +98,9 @@ class ActionQueue(threading.Thread): return self._stop.isSet() def put_status(self, commands): - if not self.statusCommandQueue.empty(): - #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones - statusCommandQueueSize = 0 - while not self.statusCommandQueue.empty(): - self.statusCommandQueue.get() - statusCommandQueueSize = statusCommandQueueSize + 1 - logger.info("Number of status commands removed from queue : " + str(statusCommandQueueSize)) + #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones + while not self.statusCommandQueue.empty(): + self.statusCommandQueue.get() for command in commands: logger.info("Adding " + command['commandType'] + " for component " + \ @@ -158,7 +156,7 @@ class ActionQueue(threading.Thread): try: while not self.stopped(): self.processBackgroundQueueSafeEmpty(); - self.processStatusCommandQueueSafeEmpty(); + self.processStatusCommandResultQueueSafeEmpty(); try: if self.parallel_execution == 0: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) @@ -202,14 +200,19 @@ class ActionQueue(threading.Thread): except Queue.Empty: pass - def processStatusCommandQueueSafeEmpty(self): - while not self.statusCommandQueue.empty(): + def processStatusCommandResultQueueSafeEmpty(self): + while not self.statusCommandResultQueue.empty(): try: - command = self.statusCommandQueue.get(False) - self.process_command(command) + result = self.statusCommandResultQueue.get(False) + self.process_status_command_result(result) except Queue.Empty: pass - + except IOError: + # on race condition in multiprocessing.Queue if get/put and thread kill are executed at the same time. + # During queue.close IOError will be thrown (this prevents from permanently dead-locked get). + pass + except UnicodeDecodeError: + pass def createCommandHandle(self, command): if command.has_key('__handle'): @@ -230,8 +233,6 @@ class ActionQueue(threading.Thread): finally: if self.controller.recovery_manager.enabled(): self.controller.recovery_manager.stop_execution_command() - elif commandType == self.STATUS_COMMAND: - self.execute_status_command(command) else: logger.error("Unrecognized command " + pprint.pformat(command)) except Exception: @@ -518,11 +519,12 @@ class ActionQueue(threading.Thread): self.commandStatuses.put_command_status(handle.command, roleResult) - def execute_status_command(self, command): + def process_status_command_result(self, result): ''' Executes commands of type STATUS_COMMAND ''' try: + command, component_status_result, component_security_status_result = result cluster = command['clusterName'] service = command['serviceName'] component = command['componentName'] @@ -537,11 +539,6 @@ class ActionQueue(threading.Thread): component_extra = None - # For custom services, responsibility to determine service status is - # delegated to python scripts - component_status_result = self.customServiceOrchestrator.requestComponentStatus(command) - component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(command) - if component_status_result['exitcode'] == 0: component_status = LiveStatus.LIVE_STATUS if self.controller.recovery_manager.enabled() \ http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index beeaad9..f6296d8 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -39,6 +39,7 @@ import AmbariConfig from ambari_agent.Heartbeat import Heartbeat from ambari_agent.Register import Register from ambari_agent.ActionQueue import ActionQueue +from ambari_agent.StatusCommandsExecutor import StatusCommandsExecutor from ambari_agent.FileCache import FileCache from ambari_agent.NetUtil import NetUtil from ambari_agent.LiveStatus import LiveStatus @@ -83,6 +84,7 @@ class Controller(threading.Thread): self.cachedconnect = None self.max_reconnect_retry_delay = int(config.get('server','max_reconnect_retry_delay', default=30)) self.hasMappedComponents = True + self.statusCommandsExecutor = None # Event is used for synchronizing heartbeat iterations (to make possible # manual wait() interruption between heartbeats ) self.heartbeat_stop_callback = heartbeat_stop_callback @@ -448,10 +450,18 @@ class Controller(threading.Thread): logger.info("Stop event received") self.DEBUG_STOP_HEARTBEATING=True + def spawnStatusCommandsExecutorProcess(self): + self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) + self.statusCommandsExecutor.start() + + def getStatusCommandsExecutor(self): + return self.statusCommandsExecutor + def run(self): try: self.actionQueue = ActionQueue(self.config, controller=self) self.actionQueue.start() + self.spawnStatusCommandsExecutorProcess() self.register = Register(self.config) self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector()) http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py index 655b2fc..b27d7d1 100644 --- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py @@ -53,7 +53,9 @@ class PythonReflectiveExecutor(PythonExecutor): returncode = 1 try: - with PythonContext(script_dir, pythonCommand): + current_context = PythonContext(script_dir, pythonCommand) + PythonReflectiveExecutor.last_context = current_context + with current_context: imp.load_source('__main__', script) except SystemExit as e: returncode = e.code @@ -76,6 +78,8 @@ class PythonContext: def __init__(self, script_dir, pythonCommand): self.script_dir = script_dir self.pythonCommand = pythonCommand + self.is_reverted = False + self.is_forced_revert = False def __enter__(self): self.old_sys_path = copy.copy(sys.path) @@ -88,12 +92,18 @@ class PythonContext: sys.argv = self.pythonCommand[1:] def __exit__(self, exc_type, exc_val, exc_tb): - sys.path = self.old_sys_path - sys.argv = self.old_agv - logging.disable(self.old_logging_disable) - self.revert_sys_modules(self.old_sys_modules) + self.revert(is_forced_revert=False) return False + def revert(self, is_forced_revert=True): + if not self.is_reverted: + self.is_forced_revert = is_forced_revert + self.is_reverted = True + sys.path = self.old_sys_path + sys.argv = self.old_agv + logging.disable(self.old_logging_disable) + self.revert_sys_modules(self.old_sys_modules) + def revert_sys_modules(self, value): sys.modules.update(value) http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py new file mode 100644 index 0000000..5d3607a --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import os +import signal +import threading +import logging +import multiprocessing +from ambari_agent.PythonReflectiveExecutor import PythonReflectiveExecutor +from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers + +logger = logging.getLogger(__name__) + +class StatusCommandsExecutor(multiprocessing.Process): + """ + A process which executes status/security status commands. + + It dies and respawns itself on timeout of the command. Which is the most graceful way to end the currently running status command. + """ + def __init__(self, config, actionQueue): + multiprocessing.Process.__init__(self) + + self.config = config + self.actionQueue = actionQueue + + self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5)) # in seconds + self.hasTimeoutedEvent = multiprocessing.Event() + + def run(self): + try: + bind_debug_signal_handlers() + while True: + command = self.actionQueue.statusCommandQueue.get(True) # blocks until status status command appears + logger.debug("Running status command for {0}".format(command['componentName'])) + + timeout_timer = threading.Timer( self.status_command_timeout, self.respawn, [command]) + timeout_timer.start() + + self.process_status_command(command) + + timeout_timer.cancel() + logger.debug("Completed status command for {0}".format(command['componentName'])) + except: + logger.exception("StatusCommandsExecutor process failed with exception:") + raise + + logger.warn("StatusCommandsExecutor process has finished") + + def process_status_command(self, command): + component_status_result = self.actionQueue.customServiceOrchestrator.requestComponentStatus(command) + component_security_status_result = self.actionQueue.customServiceOrchestrator.requestComponentSecurityState(command) + result = (command, component_status_result, component_security_status_result) + + self.actionQueue.statusCommandResultQueue.put(result) + + def respawn(self, command): + try: + if hasattr(PythonReflectiveExecutor, "last_context"): + # Force context to reset to normal. By context we mean sys.path, imports, etc. They are set by specific status command, and are not relevant to ambari-agent. + PythonReflectiveExecutor.last_context.revert() + + logger.warn("Command {0} for {1} is running for more than {2} seconds. Terminating it due to timeout.".format(command['commandType'], command['componentName'], self.status_command_timeout)) + + self.hasTimeoutedEvent.set() + except: + logger.exception("StatusCommandsExecutor.finish thread failed with exception:") + raise + + def kill(self): + os.kill(self.pid, signal.SIGKILL) + + # prevent queue from ending up with non-freed semaphores, locks during put. Which would result in dead-lock in process executing get. + self.actionQueue.statusCommandResultQueue.close() + self.actionQueue.statusCommandResultQueue.join_thread() + self.actionQueue.statusCommandResultQueue = multiprocessing.Queue() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 4eb478a..968b828 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -284,6 +284,22 @@ def reset_agent(options): MAX_RETRIES = 10 +def run_threads(server_hostname, heartbeat_stop_callback): + # Launch Controller communication + controller = Controller(config, server_hostname, heartbeat_stop_callback) + controller.start() + while controller.is_alive(): + time.sleep(0.1) + + if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()): + if controller.getStatusCommandsExecutor().is_alive(): + logger.info("Terminating statusCommandsExecutor") + controller.getStatusCommandsExecutor().kill() + logger.info("Respawning statusCommandsExecutor") + controller.spawnStatusCommandsExecutorProcess() + + controller.getStatusCommandsExecutor().kill() + # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process # we need this for windows os, where no sigterm available def main(heartbeat_stop_callback=None): @@ -388,10 +404,7 @@ def main(heartbeat_stop_callback=None): # Set the active server active_server = server_hostname # Launch Controller communication - controller = Controller(config, server_hostname, heartbeat_stop_callback) - controller.start() - while controller.is_alive(): - time.sleep(0.1) + run_threads(server_hostname, heartbeat_stop_callback) # # If Ambari Agent connected to the server or http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 6a9bad1..d4f5436 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -325,9 +325,7 @@ class TestActionQueue(TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch("logging.RootLogger.exception") @patch.object(ActionQueue, "execute_command") - @patch.object(ActionQueue, "execute_status_command") - def test_process_command(self, execute_status_command_mock, - execute_command_mock, log_exc_mock): + def test_process_command(self, execute_command_mock, log_exc_mock): dummy_controller = MagicMock() config = AmbariConfig() config.set('agent', 'tolerate_download_failures', "true") @@ -344,29 +342,19 @@ class TestActionQueue(TestCase): # Try wrong command actionQueue.process_command(wrong_command) self.assertFalse(execute_command_mock.called) - self.assertFalse(execute_status_command_mock.called) self.assertFalse(log_exc_mock.called) execute_command_mock.reset_mock() - execute_status_command_mock.reset_mock() log_exc_mock.reset_mock() # Try normal execution actionQueue.process_command(execution_command) self.assertTrue(execute_command_mock.called) - self.assertFalse(execute_status_command_mock.called) self.assertFalse(log_exc_mock.called) execute_command_mock.reset_mock() - execute_status_command_mock.reset_mock() log_exc_mock.reset_mock() - actionQueue.process_command(status_command) - self.assertFalse(execute_command_mock.called) - self.assertTrue(execute_status_command_mock.called) - self.assertFalse(log_exc_mock.called) - execute_command_mock.reset_mock() - execute_status_command_mock.reset_mock() log_exc_mock.reset_mock() # Try exception to check proper logging @@ -378,7 +366,6 @@ class TestActionQueue(TestCase): log_exc_mock.reset_mock() - execute_status_command_mock.side_effect = side_effect actionQueue.process_command(execution_command) self.assertTrue(log_exc_mock.called) @@ -944,14 +931,11 @@ class TestActionQueue(TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActionQueue, "status_update_callback") - @patch.object(CustomServiceOrchestrator, "requestComponentStatus") - @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState") @patch.object(ActionQueue, "execute_command") @patch.object(LiveStatus, "build") @patch.object(CustomServiceOrchestrator, "__init__") def test_execute_status_command(self, CustomServiceOrchestrator_mock, - build_mock, execute_command_mock, requestComponentSecurityState_mock, - requestComponentStatus_mock, + build_mock, execute_command_mock, status_update_callback): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() @@ -961,33 +945,25 @@ class TestActionQueue(TestCase): dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp()) - requestComponentStatus_mock.reset_mock() - requestComponentStatus_mock.return_value = {'exitcode': 0 } + result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN') - requestComponentSecurityState_mock.reset_mock() - requestComponentSecurityState_mock.return_value = 'UNKNOWN' - - actionQueue.execute_status_command(self.status_command) + actionQueue.process_status_command_result(result) report = actionQueue.result() expected = {'dummy report': '', 'securityState' : 'UNKNOWN'} self.assertEqual(len(report['componentStatus']), 1) self.assertEqual(report['componentStatus'][0], expected) - self.assertTrue(requestComponentStatus_mock.called) @patch.object(RecoveryManager, "command_exists") @patch.object(RecoveryManager, "requires_recovery") @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActionQueue, "status_update_callback") - @patch.object(CustomServiceOrchestrator, "requestComponentStatus") - @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState") @patch.object(ActionQueue, "execute_command") @patch.object(LiveStatus, "build") @patch.object(CustomServiceOrchestrator, "__init__") - def test_execute_status_command_recovery(self, CustomServiceOrchestrator_mock, - build_mock, execute_command_mock, requestComponentSecurityState_mock, - requestComponentStatus_mock, + def test_process_status_command_result_recovery(self, CustomServiceOrchestrator_mock, + build_mock, execute_command_mock, status_update_callback, requires_recovery_mock, command_exists_mock): CustomServiceOrchestrator_mock.return_value = None @@ -1000,13 +976,9 @@ class TestActionQueue(TestCase): dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False) - requestComponentStatus_mock.reset_mock() - requestComponentStatus_mock.return_value = {'exitcode': 0 } - - requestComponentSecurityState_mock.reset_mock() - requestComponentSecurityState_mock.return_value = 'UNKNOWN' + result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN') - actionQueue.execute_status_command(self.status_command) + actionQueue.process_status_command_result(result) report = actionQueue.result() expected = {'dummy report': '', 'securityState' : 'UNKNOWN', @@ -1014,17 +986,13 @@ class TestActionQueue(TestCase): self.assertEqual(len(report['componentStatus']), 1) self.assertEqual(report['componentStatus'][0], expected) - self.assertTrue(requestComponentStatus_mock.called) requires_recovery_mock.return_value = True command_exists_mock.return_value = True - requestComponentStatus_mock.reset_mock() - requestComponentStatus_mock.return_value = {'exitcode': 0 } - - requestComponentSecurityState_mock.reset_mock() - requestComponentSecurityState_mock.return_value = 'UNKNOWN' + + result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN') - actionQueue.execute_status_command(self.status_command) + actionQueue.process_status_command_result(result) report = actionQueue.result() expected = {'dummy report': '', 'securityState' : 'UNKNOWN', @@ -1032,39 +1000,33 @@ class TestActionQueue(TestCase): self.assertEqual(len(report['componentStatus']), 1) self.assertEqual(report['componentStatus'][0], expected) - self.assertTrue(requestComponentStatus_mock.called) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActionQueue, "status_update_callback") - @patch.object(CustomServiceOrchestrator, "requestComponentStatus") - @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState") @patch.object(ActionQueue, "execute_command") @patch.object(LiveStatus, "build") @patch.object(CustomServiceOrchestrator, "__init__") - def test_execute_status_command_with_alerts(self, CustomServiceOrchestrator_mock, - requestComponentSecurityState_mock, + def test_process_status_command_result_with_alerts(self, CustomServiceOrchestrator_mock, build_mock, execute_command_mock, - requestComponentStatus_mock, status_update_callback): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() actionQueue = ActionQueue(AmbariConfig(), dummy_controller) - - - requestComponentStatus_mock.reset_mock() - requestComponentStatus_mock.return_value = { + command_return_value = { 'exitcode': 0, 'stdout': 'out', 'stderr': 'err', 'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] } } + + result = (self.status_command_for_alerts, command_return_value, command_return_value) + build_mock.return_value = {'somestatusresult': 'aresult'} - actionQueue.execute_status_command(self.status_command_for_alerts) + actionQueue.process_status_command_result(result) report = actionQueue.result() - self.assertTrue(requestComponentStatus_mock.called) self.assertEqual(len(report['componentStatus']), 1) self.assertTrue(report['componentStatus'][0].has_key('alerts')) @@ -1324,7 +1286,7 @@ class TestActionQueue(TestCase): execute_command = copy.deepcopy(self.background_command) actionQueue.put([execute_command]) actionQueue.processBackgroundQueueSafeEmpty(); - actionQueue.processStatusCommandQueueSafeEmpty(); + actionQueue.processStatusCommandResultQueueSafeEmpty(); #assert that python execturor start self.assertTrue(runCommand_mock.called) @@ -1368,7 +1330,7 @@ class TestActionQueue(TestCase): None, command_complete_w) actionQueue.put([self.background_command]) actionQueue.processBackgroundQueueSafeEmpty(); - actionQueue.processStatusCommandQueueSafeEmpty(); + actionQueue.processStatusCommandResultQueueSafeEmpty(); with lock: complete_done.wait(0.1) http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/test/python/ambari_agent/TestController.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py index 59b41cd..b47af03 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestController.py +++ b/ambari-agent/src/test/python/ambari_agent/TestController.py @@ -44,6 +44,7 @@ import ambari_commons @not_for_platform(PLATFORM_WINDOWS) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) +@patch.object(Controller.Controller, "spawnStatusCommandsExecutorProcess", new = MagicMock()) class TestController(unittest.TestCase): logger = logging.getLogger() http://git-wip-us.apache.org/repos/asf/ambari/blob/f8ccb478/ambari-agent/src/test/python/ambari_agent/TestMain.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py index 400241f..998b778 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestMain.py +++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py @@ -324,6 +324,7 @@ class TestMain(unittest.TestCase): @patch.object(Controller, "__init__") @patch.object(Controller, "is_alive") @patch.object(Controller, "start") + @patch.object(Controller, "getStatusCommandsExecutor") @patch("optparse.OptionParser.parse_args") @patch.object(DataCleaner,"start") @patch.object(DataCleaner,"__init__") @@ -332,7 +333,7 @@ class TestMain(unittest.TestCase): @patch.object(ExitHelper,"execute_cleanup") @patch.object(ExitHelper, "exit") def test_main(self, exithelper_exit_mock, cleanup_mock, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock, - parse_args_mock, start_mock, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock, + parse_args_mock, start_mock, Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock, update_log_level_mock, daemonize_mock, perform_prestart_checks_mock, ambari_config_mock, stop_mock, bind_signal_handlers_mock,