Repository: ambari Updated Branches: refs/heads/trunk 54e22bc94 -> e7fdb7146
AMBARI-11089. Auto recovery commands should not be scheduled till the execution commands are complete Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e7fdb714 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e7fdb714 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e7fdb714 Branch: refs/heads/trunk Commit: e7fdb7146bc278aafd3d9530b7c9f824e4716cb2 Parents: 54e22bc Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Tue May 12 19:55:20 2015 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Tue May 12 19:55:20 2015 -0700 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 30 +++++- .../python/ambari_agent/CommandStatusDict.py | 5 + .../src/main/python/ambari_agent/Controller.py | 2 +- .../main/python/ambari_agent/RecoveryManager.py | 14 +++ .../test/python/ambari_agent/TestActionQueue.py | 99 +++++++++++++++++++- .../python/ambari_agent/TestRecoveryManager.py | 14 ++- 6 files changed, 160 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index bdaefd0..59a5720 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -193,7 +193,13 @@ class ActionQueue(threading.Thread): logger.debug("Took an element of Queue (command type = %s)." % commandType) try: if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]: - self.execute_command(command) + try: + if self.controller.recovery_manager.enabled(): + self.controller.recovery_manager.start_execution_command() + self.execute_command(command) + finally: + if self.controller.recovery_manager.enabled(): + self.controller.recovery_manager.stop_execution_command() elif commandType == self.STATUS_COMMAND: self.execute_status_command(command) else: @@ -203,6 +209,15 @@ class ActionQueue(threading.Thread): traceback.print_exc() logger.warn(err) + def tasks_in_progress_or_pending(self): + return_val = False + if not self.commandQueue.empty(): + return_val = True + if self.controller.recovery_manager.has_active_command(): + return_val = True + return return_val + pass + def execute_command(self, command): ''' Executes commands of type EXECUTION_COMMAND @@ -248,6 +263,8 @@ class ActionQueue(threading.Thread): maxAttempts = int(command['commandParams']['command_retry_max_attempt_count']) if 'command_retry_enabled' in command['commandParams']: retryAble = command['commandParams']['command_retry_enabled'] == "true" + if isAutoExecuteCommand: + retryAble = False logger.debug("Command execution metadata - retry enabled = {retryAble}, max attempt count = {maxAttemptCount}". format(retryAble = retryAble, maxAttemptCount = maxAttempts)) @@ -296,6 +313,17 @@ class ActionQueue(threading.Thread): # let ambari know that configuration tags were applied if status == self.COMPLETED_STATUS: + if self.controller.recovery_manager.enabled() and command.has_key('roleCommand'): + if command['roleCommand'] == self.ROLE_COMMAND_START: + self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS) + elif command['roleCommand'] == self.ROLE_COMMAND_STOP or command['roleCommand'] == self.ROLE_COMMAND_INSTALL: + self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.DEAD_STATUS) + elif command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND: + if command['hostLevelParams'].has_key('custom_command') and \ + command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART: + self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS) + pass + configHandler = ActualConfigHandler(self.config, self.configTags) #update if command.has_key('forceRefreshConfigTags') and len(command['forceRefreshConfigTags']) > 0 : http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index 0ebc45e..861b568 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -78,6 +78,7 @@ class CommandStatusDict(): with self.lock: c = copy.copy(self.current_state[taskId][1]) return c + def generate_report(self): """ Generates status reports about commands that are IN_PROGRESS, COMPLETE or @@ -103,6 +104,10 @@ class CommandStatusDict(): resultComponentStatus.append(report) # Component status is useful once, removing it del self.current_state[key] + elif command ['commandType'] in [ActionQueue.AUTO_EXECUTION_COMMAND]: + logger.debug("AUTO_EXECUTION_COMMAND task deleted " + str(command['commandId'])) + del self.current_state[key] + pass result = { 'reports': resultReports, 'componentStatus': resultComponentStatus http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 9ebb83a..ccd1233 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -283,7 +283,7 @@ class Controller(threading.Thread): self.recovery_manager.process_status_commands(response['statusCommands']) self.addToStatusQueue(response['statusCommands']) - if self.actionQueue.commandQueue.empty(): + if not self.actionQueue.tasks_in_progress_or_pending(): recovery_commands = self.recovery_manager.get_recovery_commands() for recovery_command in recovery_commands: logger.info("Adding recovery command %s for component %s", http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py index e2c5e98..12ba75d 100644 --- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py +++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py @@ -88,12 +88,26 @@ class RecoveryManager: self.statuses = {} self.__status_lock = threading.RLock() self.__command_lock = threading.RLock() + self.__active_command_lock = threading.RLock() + self.active_command_count = 0 self.paused = False self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only) pass + def start_execution_command(self): + with self.__active_command_lock: + self.active_command_count += 1 + pass + + def stop_execution_command(self): + with self.__active_command_lock: + self.active_command_count -= 1 + pass + + def has_active_command(self): + return self.active_command_count > 0 def set_paused(self, paused): if self.paused != paused: http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index ee18c38..7a834d8 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -67,6 +67,19 @@ class TestActionQueue(TestCase): 'configurationTags':{'global' : { 'tag': 'v1' }} } + datanode_auto_start_command = { + 'commandType': 'AUTO_EXECUTION_COMMAND', + 'role': u'DATANODE', + 'roleCommand': u'START', + 'commandId': '1-1', + 'taskId': 3, + 'clusterName': u'cc', + 'serviceName': u'HDFS', + 'hostLevelParams': {}, + 'configurations':{'global' : {}}, + 'configurationTags':{'global' : { 'tag': 'v1' }} + } + datanode_upgrade_command = { 'commandId': 17, 'role' : "role", @@ -311,6 +324,85 @@ class TestActionQueue(TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch("__builtin__.open") @patch.object(ActionQueue, "status_update_callback") + def test_auto_execute_command(self, status_update_callback_mock, open_mock): + # Make file read calls visible + def open_side_effect(file, mode): + if mode == 'r': + file_mock = MagicMock() + file_mock.read.return_value = "Read from " + str(file) + return file_mock + else: + return self.original_open(file, mode) + open_mock.side_effect = open_side_effect + + config = AmbariConfig() + tempdir = tempfile.gettempdir() + config.set('agent', 'prefix', tempdir) + config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") + config.set('agent', 'tolerate_download_failures', "true") + dummy_controller = MagicMock() + dummy_controller.recovery_manager = RecoveryManager() + dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False) + + actionQueue = ActionQueue(config, dummy_controller) + unfreeze_flag = threading.Event() + python_execution_result_dict = { + 'stdout': 'out', + 'stderr': 'stderr', + 'structuredOut' : '' + } + + def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False): + unfreeze_flag.wait() + return python_execution_result_dict + def patched_aq_execute_command(command): + # We have to perform patching for separate thread in the same thread + with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: + runCommand_mock.side_effect = side_effect + actionQueue.process_command(command) + + python_execution_result_dict['status'] = 'COMPLETE' + python_execution_result_dict['exitcode'] = 0 + self.assertFalse(actionQueue.tasks_in_progress_or_pending()) + # We call method in a separate thread + execution_thread = Thread(target = patched_aq_execute_command , + args = (self.datanode_auto_start_command, )) + execution_thread.start() + # check in progress report + # wait until ready + while True: + time.sleep(0.1) + if actionQueue.tasks_in_progress_or_pending(): + break + # Continue command execution + unfreeze_flag.set() + # wait until ready + while actionQueue.tasks_in_progress_or_pending(): + time.sleep(0.1) + report = actionQueue.result() + + self.assertEqual(len(report['reports']), 0) + + ## Test failed execution + python_execution_result_dict['status'] = 'FAILED' + python_execution_result_dict['exitcode'] = 13 + # We call method in a separate thread + execution_thread = Thread(target = patched_aq_execute_command , + args = (self.datanode_auto_start_command, )) + execution_thread.start() + unfreeze_flag.set() + # check in progress report + # wait until ready + report = actionQueue.result() + while actionQueue.tasks_in_progress_or_pending(): + time.sleep(0.1) + report = actionQueue.result() + + self.assertEqual(len(report['reports']), 0) + + @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) + @patch("__builtin__.open") + @patch.object(ActionQueue, "status_update_callback") def test_execute_command(self, status_update_callback_mock, open_mock): # Make file read calls visible def open_side_effect(file, mode): @@ -371,7 +463,9 @@ class TestActionQueue(TestCase): 'taskId': 3, 'exitCode': 777} self.assertEqual(report['reports'][0], expected) - # Continue command execution + self.assertTrue(actionQueue.tasks_in_progress_or_pending()) + + # Continue command execution unfreeze_flag.set() # wait until ready while report['reports'][0]['status'] == 'IN_PROGRESS': @@ -631,6 +725,7 @@ class TestActionQueue(TestCase): get_mock, process_command_mock, gpeo_mock): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() + dummy_controller.recovery_manager = RecoveryManager() config = MagicMock() gpeo_mock.return_value = 0 config.get_parallel_exec_option = gpeo_mock @@ -638,8 +733,10 @@ class TestActionQueue(TestCase): actionQueue.start() actionQueue.put([self.datanode_install_command, self.hbase_install_command]) self.assertEqual(2, actionQueue.commandQueue.qsize()) + self.assertTrue(actionQueue.tasks_in_progress_or_pending()) actionQueue.reset() self.assertTrue(actionQueue.commandQueue.empty()) + self.assertFalse(actionQueue.tasks_in_progress_or_pending()) time.sleep(0.1) actionQueue.stop() actionQueue.join() http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py index bd7c96b..aaf6e53 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py +++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py @@ -507,4 +507,16 @@ class TestRecoveryManager(TestCase): commands = rm.get_recovery_commands() self.assertEqual(1, len(commands)) self.assertEqual("START", commands[0]["roleCommand"]) - pass \ No newline at end of file + pass + + def test_command_count(self): + rm = RecoveryManager(True) + self.assertFalse(rm.has_active_command()) + rm.start_execution_command() + self.assertTrue(rm.has_active_command()) + rm.start_execution_command() + self.assertTrue(rm.has_active_command()) + rm.stop_execution_command() + self.assertTrue(rm.has_active_command()) + rm.stop_execution_command() + self.assertFalse(rm.has_active_command()) \ No newline at end of file