SLIDER-262. Slider agent should provide process supervision such as auto-restart.
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/806ebb27 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/806ebb27 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/806ebb27 Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry Commit: 806ebb273f7b66027c771c2f13e2c8ef95cbe829 Parents: 506eb79 Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Wed Aug 13 07:53:31 2014 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Thu Aug 14 17:56:52 2014 -0700 ---------------------------------------------------------------------- .../src/main/python/agent/ActionQueue.py | 1 + .../src/main/python/agent/CommandStatusDict.py | 2 + .../src/main/python/agent/Controller.py | 11 +- .../src/test/python/agent/TestActionQueue.py | 7 +- .../test/python/agent/TestCommandStatusDict.py | 94 +++++++++++++++ .../src/test/python/agent/TestController.py | 116 ++++++++++++++++++- 6 files changed, 224 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/806ebb27/slider-agent/src/main/python/agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/ActionQueue.py b/slider-agent/src/main/python/agent/ActionQueue.py index 774d1e6..fba9aa2 100644 --- a/slider-agent/src/main/python/agent/ActionQueue.py +++ b/slider-agent/src/main/python/agent/ActionQueue.py @@ -135,6 +135,7 @@ class ActionQueue(threading.Thread): 'reportResult': reportResult }) self.commandStatuses.put_command_status(command, in_progress_status, reportResult) + store_config = False if ActionQueue.STORE_APPLIED_CONFIG in command['commandParams']: store_config = 'true' == command['commandParams'][ActionQueue.STORE_APPLIED_CONFIG] http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/806ebb27/slider-agent/src/main/python/agent/CommandStatusDict.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/CommandStatusDict.py b/slider-agent/src/main/python/agent/CommandStatusDict.py index 9261e29..f20e0fa 100644 --- a/slider-agent/src/main/python/agent/CommandStatusDict.py +++ b/slider-agent/src/main/python/agent/CommandStatusDict.py @@ -114,12 +114,14 @@ class CommandStatusDict(): grep = Grep() output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES) inprogress = self.generate_report_template(command) + reportResult = not command[Constants.AUTO_GENERATED] inprogress.update({ 'stdout': grep.filterMarkup(output), 'stderr': tmperr, 'structuredOut': tmpstructuredout, Constants.EXIT_CODE: 777, 'status': ActionQueue.IN_PROGRESS_STATUS, + 'reportResult': reportResult }) return inprogress http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/806ebb27/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index 77dcafb..6bdfc26 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -268,7 +268,8 @@ class Controller(threading.Thread): if len(stored_command) > 0: auto_start_command = self.create_start_command(stored_command) if auto_start_command: - self.updateStateBasedOnCommand([auto_start_command]) + logger.info("Automatically adding a start command.") + self.updateStateBasedOnCommand([auto_start_command], False) self.addToQueue([auto_start_command]) pass @@ -343,21 +344,25 @@ class Controller(threading.Thread): pass logger.info("Controller stopped heart-beating.") + def create_start_command(self, stored_command): taskId = int(stored_command['taskId']) taskId = taskId + 1 stored_command['taskId'] = taskId stored_command['commandId'] = "{0}-1".format(taskId) stored_command[Constants.AUTO_GENERATED] = True + return stored_command pass - def updateStateBasedOnCommand(self, commands): + + def updateStateBasedOnCommand(self, commands, createStatus=True): for command in commands: if command["roleCommand"] == "START": self.componentExpectedState = State.STARTED self.componentActualState = State.STARTING self.failureCount = 0 - self.statusCommand = self.createStatusCommand(command) + if createStatus: + self.statusCommand = self.createStatusCommand(command) if command["roleCommand"] == "INSTALL": self.componentExpectedState = State.INSTALLED http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/806ebb27/slider-agent/src/test/python/agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestActionQueue.py b/slider-agent/src/test/python/agent/TestActionQueue.py index f0935a9..4a53c2e 100644 --- a/slider-agent/src/test/python/agent/TestActionQueue.py +++ b/slider-agent/src/test/python/agent/TestActionQueue.py @@ -274,6 +274,7 @@ class TestActionQueue(TestCase): def test_execute_command(self, status_update_callback_mock, open_mock, json_load_mock, resolve_script_path_mock): + self.assertEqual.__self__.maxDiff = None tempdir = tempfile.gettempdir() config = MagicMock() config.get.return_value = "something" @@ -344,7 +345,8 @@ class TestActionQueue(TestCase): 'role': u'HBASE_MASTER', 'actionId': '1-1', 'taskId': 3, - 'exitcode': 777} + 'exitcode': 777, + 'reportResult': True} self.assertEqual(report['reports'][0], expected) # Continue command execution unfreeze_flag.set() @@ -406,7 +408,8 @@ class TestActionQueue(TestCase): 'actionId': '1-1', 'taskId': 3, 'structuredOut': '', - 'exitcode': 13} + 'exitcode': 13, + 'reportResult': True} self.assertEqual(len(report['reports']), 1) self.assertEqual(report['reports'][0], expected) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/806ebb27/slider-agent/src/test/python/agent/TestCommandStatusDict.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestCommandStatusDict.py b/slider-agent/src/test/python/agent/TestCommandStatusDict.py new file mode 100644 index 0000000..ee91de6 --- /dev/null +++ b/slider-agent/src/test/python/agent/TestCommandStatusDict.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import StringIO +import ssl +import unittest, threading +import sys +from CommandStatusDict import CommandStatusDict +from mock.mock import patch, MagicMock, call, Mock +import logging +from threading import Event + +class TestCommandStatusDict(unittest.TestCase): + + logger = logging.getLogger() + + auto_hbase_install_command = { + 'commandType': 'EXECUTION_COMMAND', + 'role': u'HBASE', + 'roleCommand': u'INSTALL', + 'commandId': '1-1', + 'taskId': 7, + "componentName": "HBASE_MASTER", + 'clusterName': u'cc', + 'serviceName': u'HDFS', + 'auto_generated': True + } + + @patch("__builtin__.open") + def test_generate_progress_report(self, open_mock): + csd = CommandStatusDict(None) + report = {} + report['tmpout'] = None + report['tmperr'] = None + report['structuredOut'] = None + + # Make file read calls visible + def open_side_effect(file, mode): + if mode == 'r': + file_mock = MagicMock() + file_mock.read.return_value = "Read from " + str(file) + return file_mock + else: + return self.original_open(file, mode) + + open_mock.side_effect = open_side_effect + + inprogress = csd.generate_in_progress_report(self.auto_hbase_install_command, report) + expected = { + 'status': 'IN_PROGRESS', + 'stderr': 'Read from None', + 'stdout': 'Read from None', + 'clusterName': u'cc', + 'structuredOut': '{}', + 'reportResult': False, + 'roleCommand': u'INSTALL', + 'serviceName': u'HDFS', + 'role': u'HBASE', + 'actionId': '1-1', + 'taskId': 7, + 'exitcode': 777} + self.assertEqual(inprogress, expected) + + self.auto_hbase_install_command['auto_generated'] = False + inprogress = csd.generate_in_progress_report(self.auto_hbase_install_command, report) + expected['reportResult'] = True + self.assertEqual(inprogress, expected) + pass + +if __name__ == "__main__": + logging.basicConfig(format='%(asctime)s %(message)s',level=logging.DEBUG) + unittest.main() + + + + http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/806ebb27/slider-agent/src/test/python/agent/TestController.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestController.py b/slider-agent/src/test/python/agent/TestController.py index af62c67..91e12f1 100644 --- a/slider-agent/src/test/python/agent/TestController.py +++ b/slider-agent/src/test/python/agent/TestController.py @@ -42,7 +42,7 @@ class TestController(unittest.TestCase): @patch.object(hostname, "hostname") def setUp(self, hostname_method, NetUtil_mock, lockMock, threadMock): - Controller.logger = MagicMock() + #Controller.logger = MagicMock() lockMock.return_value = MagicMock() NetUtil_mock.return_value = MagicMock() hostname_method.return_value = "test_hostname" @@ -589,9 +589,121 @@ class TestController(unittest.TestCase): self.controller.config = original_value pass + def test_create_start_command(self): + stored_command = { + 'commandType': 'EXECUTION_COMMAND', + 'role': u'HBASE_MASTER', + "componentName": "HBASE_MASTER", + 'roleCommand': u'INSTALL', + 'commandId': '1-1', + 'taskId': 3, + 'clusterName': u'cc', + 'serviceName': u'HBASE', + 'configurations': {'global': {}}, + 'configurationTags': {'global': {'tag': 'v1'}}, + 'auto_generated': False, + 'roleParams': {'auto_restart':'false'}, + 'commandParams': {'script_type': 'PYTHON', + 'script': 'scripts/abc.py', + 'command_timeout': '600'} + } + + expected = { + 'commandType': 'EXECUTION_COMMAND', + 'role': u'HBASE_MASTER', + "componentName": "HBASE_MASTER", + 'roleCommand': u'INSTALL', + 'commandId': '4-1', + 'taskId': 4, + 'clusterName': u'cc', + 'serviceName': u'HBASE', + 'configurations': {'global': {}}, + 'configurationTags': {'global': {'tag': 'v1'}}, + 'auto_generated': False, + 'roleParams': {'auto_restart':'false'}, + 'commandParams': {'script_type': 'PYTHON', + 'script': 'scripts/abc.py', + 'command_timeout': '600'}, + 'auto_generated': True + } + + modified_command = self.controller.create_start_command(stored_command) + self.assertEqual.__self__.maxDiff = None + self.assertEqual(modified_command, expected) + + @patch.object(Controller.Controller, "createStatusCommand") + @patch.object(threading._Event, "wait") + @patch("time.sleep") + @patch("json.loads") + @patch("json.dumps") + def test_auto_start(self, dumpsMock, loadsMock, timeMock, waitMock, mock_createStatusCommand): + original_value = self.controller.config + self.controller.config = AgentConfig("", "") + out = StringIO.StringIO() + sys.stdout = out + + heartbeat = MagicMock() + self.controller.heartbeat = heartbeat + + dumpsMock.return_value = "data" + + sendRequest = MagicMock(name="sendRequest") + self.controller.sendRequest = sendRequest + + self.controller.responseId = 1 + response = {"responseId":"2", "restartAgent":"false", "restartEnabled":'true'} + loadsMock.return_value = response + + def one_heartbeat(*args, **kwargs): + self.controller.DEBUG_STOP_HEARTBEATING = True + return "data" + + sendRequest.side_effect = one_heartbeat + + actionQueue = MagicMock() + actionQueue.isIdle.return_value = True + + # one successful request, after stop + self.controller.actionQueue = actionQueue + self.controller.componentActualState = State.INSTALLED + self.controller.componentExpectedState = State.STARTED + self.assertTrue(self.controller.componentActualState, State.INSTALLED) + self.controller.actionQueue.customServiceOrchestrator.stored_command = { + 'commandType': 'EXECUTION_COMMAND', + 'role': u'HBASE', + 'roleCommand': u'START', + 'commandId': '7-1', + 'taskId': 7, + "componentName": "HBASE_MASTER", + 'clusterName': u'cc', + 'serviceName': u'HDFS' + } + addToQueue = MagicMock(name="addToQueue") + self.controller.addToQueue = addToQueue + + self.controller.heartbeatWithServer() + self.assertTrue(sendRequest.called) + + self.assertTrue(self.controller.componentActualState, State.STARTING) + self.assertTrue(self.controller.componentExpectedState, State.STARTED) + self.assertEquals(self.controller.failureCount, 0) + self.assertFalse(mock_createStatusCommand.called) + addToQueue.assert_has_calls([call([{ + 'commandType': 'EXECUTION_COMMAND', + 'clusterName': u'cc', + 'serviceName': u'HDFS', + 'role': u'HBASE', + 'taskId': 8, + 'roleCommand': u'START', + 'componentName': 'HBASE_MASTER', + 'commandId': '8-1', + 'auto_generated': True}])]) + self.controller.config = original_value + pass + if __name__ == "__main__": - logging.basicConfig(format='%(asctime)s %(message)s',level=logging.DEBUG) + logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG) unittest.main()