SLIDER-323. Allocated ports must be reported back upon AM restart SLIDER-262. Slider agent should provide process supervision such as auto-restart
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/47822dff Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/47822dff Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/47822dff Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry Commit: 47822dffa3e086c198fed820ec1e4b099a453659 Parents: 18b0545 Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Thu Aug 14 17:36:59 2014 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Thu Aug 14 17:57:43 2014 -0700 ---------------------------------------------------------------------- .../src/main/python/agent/ActionQueue.py | 11 ++- .../src/main/python/agent/CommandStatusDict.py | 5 +- .../src/main/python/agent/Controller.py | 27 ++++--- .../python/agent/CustomServiceOrchestrator.py | 8 +- slider-agent/src/main/python/agent/Heartbeat.py | 9 ++- slider-agent/src/main/python/agent/Register.py | 7 +- .../src/test/python/agent/TestActionQueue.py | 2 - .../src/test/python/agent/TestController.py | 29 +++++-- .../agent/TestCustomServiceOrchestrator.py | 2 + .../src/test/python/agent/TestHeartbeat.py | 82 +++++++++++++++++++- .../src/test/python/agent/TestRegistration.py | 8 +- .../providers/agent/AgentProviderService.java | 51 +++++++++--- .../appmaster/web/rest/agent/HeartBeat.java | 10 --- .../appmaster/web/rest/agent/Register.java | 58 ++++++++++---- .../agent/TestAgentProviderService.java | 17 ++++ 15 files changed, 254 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/ActionQueue.py b/slider-agent/src/main/python/agent/ActionQueue.py index fba9aa2..4c45a76 100644 --- a/slider-agent/src/main/python/agent/ActionQueue.py +++ b/slider-agent/src/main/python/agent/ActionQueue.py @@ -122,7 +122,8 @@ class ActionQueue(threading.Thread): taskId = command['taskId'] - reportResult = not command[Constants.AUTO_GENERATED] + # if auto generated then do not report result + reportResult = CommandStatusDict.shouldReportResult(command) # Preparing 'IN_PROGRESS' report in_progress_status = self.commandStatuses.generate_report_template(command) @@ -140,7 +141,8 @@ class ActionQueue(threading.Thread): if ActionQueue.STORE_APPLIED_CONFIG in command['commandParams']: store_config = 'true' == command['commandParams'][ActionQueue.STORE_APPLIED_CONFIG] store_command = False - if ActionQueue.AUTO_RESTART in command['roleParams']: + if 'roleParams' in command and ActionQueue.AUTO_RESTART in command['roleParams']: + logger.info("Component has indicated auto-restart. Saving details from START command.") store_command = 'true' == command['roleParams'][ActionQueue.AUTO_RESTART] @@ -195,10 +197,7 @@ class ActionQueue(threading.Thread): cluster = command['clusterName'] service = command['serviceName'] component = command['componentName'] - reportResult = True - if Constants.AUTO_GENERATED in command: - reportResult = not command[Constants.AUTO_GENERATED] - + reportResult = CommandStatusDict.shouldReportResult(command) component_status = self.customServiceOrchestrator.requestComponentStatus(command) result = {"componentName": component, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/CommandStatusDict.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/CommandStatusDict.py b/slider-agent/src/main/python/agent/CommandStatusDict.py index f20e0fa..bcbce9e 100644 --- a/slider-agent/src/main/python/agent/CommandStatusDict.py +++ b/slider-agent/src/main/python/agent/CommandStatusDict.py @@ -114,7 +114,7 @@ class CommandStatusDict(): grep = Grep() output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES) inprogress = self.generate_report_template(command) - reportResult = not command[Constants.AUTO_GENERATED] + reportResult = CommandStatusDict.shouldReportResult(command) inprogress.update({ 'stdout': grep.filterMarkup(output), 'stderr': tmperr, @@ -142,3 +142,6 @@ class CommandStatusDict(): return stub + @staticmethod + def shouldReportResult(command): + return not (Constants.AUTO_GENERATED in command and command[Constants.AUTO_GENERATED]) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index 6bdfc26..1e27efa 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -118,7 +118,11 @@ class Controller(threading.Thread): while not self.isRegistered: try: - data = json.dumps(self.register.build(id)) + data = json.dumps(self.register.build( + self.componentActualState, + self.componentExpectedState, + self.actionQueue.customServiceOrchestrator.allocated_ports, + id)) logger.info("Registering with the server at " + self.registerUrl + " with data " + pprint.pformat(data)) response = self.sendRequest(self.registerUrl, data) @@ -211,7 +215,7 @@ class Controller(threading.Thread): try: if not retry: data = json.dumps( - self.heartbeat.build(commandResult, self.componentActualState, + self.heartbeat.build(commandResult, self.responseId, self.hasMappedComponents)) self.updateStateBasedOnResult(commandResult) logger.debug("Sending request: " + data) @@ -225,8 +229,11 @@ class Controller(threading.Thread): serverId = int(response['responseId']) + restartEnabled = False if 'restartEnabled' in response: - restartEnabled = 'true' == response['restartEnabled'] + restartEnabled = response['restartEnabled'] + if restartEnabled: + logger.info("Component auto-restart is enabled.") if 'hasMappedComponents' in response.keys(): self.hasMappedComponents = response['hasMappedComponents'] != False @@ -262,15 +269,16 @@ class Controller(threading.Thread): pass # Add a start command - if self.componentActualState == State.INSTALLED and \ + if self.componentActualState == State.FAILED and \ self.componentExpectedState == State.STARTED and restartEnabled: stored_command = self.actionQueue.customServiceOrchestrator.stored_command if len(stored_command) > 0: - auto_start_command = self.create_start_command(stored_command) - if auto_start_command: - logger.info("Automatically adding a start command.") - self.updateStateBasedOnCommand([auto_start_command], False) - self.addToQueue([auto_start_command]) + auto_start_command = self.create_start_command(stored_command) + if auto_start_command: + logger.info("Automatically adding a start command.") + logger.debug("Auto start command: " + pprint.pformat(auto_start_command)) + self.updateStateBasedOnCommand([auto_start_command], False) + self.addToQueue([auto_start_command]) pass # Add a status command @@ -388,6 +396,7 @@ class Controller(threading.Thread): if "healthStatus" in commandResult: if commandResult["healthStatus"] == "INSTALLED": + # Mark it FAILED as its a failure remedied by auto-start or container restart self.componentActualState = State.FAILED self.failureCount += 1 self.logStates() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py index 6b2ace5..15f1664 100644 --- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py +++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py @@ -57,6 +57,7 @@ class CustomServiceOrchestrator(): 'status_command_stderr.txt')) self.public_fqdn = hostname.public_hostname() self.stored_command = {} + self.allocated_ports = {} # Clean up old status command files if any try: os.unlink(self.status_commands_stdout) @@ -69,7 +70,7 @@ class CustomServiceOrchestrator(): def runCommand(self, command, tmpoutfile, tmperrfile, override_output_files=True, store_command=False): - allocated_port = {} + allocated_ports = {} try: script_type = command['commandParams']['script_type'] script = command['commandParams']['script'] @@ -86,7 +87,7 @@ class CustomServiceOrchestrator(): # We don't support anything else yet message = "Unknown script type {0}".format(script_type) raise AgentException(message) - json_path = self.dump_command_to_json(command, allocated_port, store_command) + json_path = self.dump_command_to_json(command, allocated_ports, store_command) py_file_list = [script_tuple] # filter None values filtered_py_file_list = [i for i in py_file_list if i] @@ -132,7 +133,8 @@ class CustomServiceOrchestrator(): } if Constants.EXIT_CODE in ret and ret[Constants.EXIT_CODE] == 0: - ret[Constants.ALLOCATED_PORTS] = allocated_port + ret[Constants.ALLOCATED_PORTS] = allocated_ports + self.allocated_ports = allocated_ports # Irrespective of the outcome report the folder paths if command_name == 'INSTALL': http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/Heartbeat.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Heartbeat.py b/slider-agent/src/main/python/agent/Heartbeat.py index aa403d4..b107d92 100644 --- a/slider-agent/src/main/python/agent/Heartbeat.py +++ b/slider-agent/src/main/python/agent/Heartbeat.py @@ -36,7 +36,7 @@ class Heartbeat: self.config = config self.reports = [] - def build(self, commandResult, componentActualState, id='-1', + def build(self, commandResult, id='-1', componentsMapped=False): timestamp = int(time.time() * 1000) queueResult = self.actionQueue.result() @@ -49,8 +49,7 @@ class Heartbeat: 'timestamp': timestamp, 'hostname': self.config.getLabel(), 'nodeStatus': nodeStatus, - 'fqdn': hostname.public_hostname(), - 'agentState': componentActualState + 'fqdn': hostname.public_hostname() } commandsInProgress = False @@ -62,6 +61,10 @@ class Heartbeat: if report['reportResult']: del report['reportResult'] heartbeat['reports'].append(report) + else: + # dropping the result but only recording the status + commandResult["commandStatus"] = report["status"] + pass if len(heartbeat['reports']) > 0: # There may be IN_PROGRESS tasks commandsInProgress = True http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/Register.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Register.py b/slider-agent/src/main/python/agent/Register.py index 7c7ff06..b59154f 100644 --- a/slider-agent/src/main/python/agent/Register.py +++ b/slider-agent/src/main/python/agent/Register.py @@ -29,7 +29,7 @@ class Register: def __init__(self, config): self.config = config - def build(self, id='-1'): + def build(self, actualState, expectedState, allocated_ports, id='-1'): timestamp = int(time.time() * 1000) version = self.read_agent_version() @@ -38,7 +38,10 @@ class Register: 'timestamp': timestamp, 'hostname': self.config.getLabel(), 'publicHostname': hostname.public_hostname(), - 'agentVersion': version + 'agentVersion': version, + 'actualState': actualState, + 'expectedState': expectedState, + 'allocatedPorts': allocated_ports } return register http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/test/python/agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestActionQueue.py b/slider-agent/src/test/python/agent/TestActionQueue.py index 4a53c2e..8071ee8 100644 --- a/slider-agent/src/test/python/agent/TestActionQueue.py +++ b/slider-agent/src/test/python/agent/TestActionQueue.py @@ -61,8 +61,6 @@ class TestActionQueue(TestCase): 'serviceName': u'HBASE', 'configurations': {'global': {}}, 'configurationTags': {'global': {'tag': 'v1'}}, - 'auto_generated': False, - 'roleParams': {'auto_restart':'false'}, 'commandParams': {'script_type': 'PYTHON', 'script': 'scripts/abc.py', 'command_timeout': '600'} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/test/python/agent/TestController.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestController.py b/slider-agent/src/test/python/agent/TestController.py index 91e12f1..401d69a 100644 --- a/slider-agent/src/test/python/agent/TestController.py +++ b/slider-agent/src/test/python/agent/TestController.py @@ -55,6 +55,7 @@ class TestController(unittest.TestCase): self.controller = Controller.Controller(config) self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS = 0.1 self.controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 0.1 + self.controller.actionQueue = ActionQueue.ActionQueue(config, self.controller) @patch("json.dumps") @@ -289,7 +290,7 @@ class TestController(unittest.TestCase): self.controller.sendRequest = sendRequest self.controller.responseId = 1 - response = {"responseId":"2", "restartAgent":"false"} + response = {"responseId":"2", "restartAgent": False} loadsMock.return_value = response def one_heartbeat(*args, **kwargs): @@ -651,8 +652,9 @@ class TestController(unittest.TestCase): self.controller.sendRequest = sendRequest self.controller.responseId = 1 - response = {"responseId":"2", "restartAgent":"false", "restartEnabled":'true'} - loadsMock.return_value = response + response1 = {"responseId": "2", "restartAgent": False, "restartEnabled": True} + response2 = {"responseId": "2", "restartAgent": False, "restartEnabled": False} + loadsMock.side_effect = [response1, response2, response1] def one_heartbeat(*args, **kwargs): self.controller.DEBUG_STOP_HEARTBEATING = True @@ -665,9 +667,9 @@ class TestController(unittest.TestCase): # one successful request, after stop self.controller.actionQueue = actionQueue - self.controller.componentActualState = State.INSTALLED + self.controller.componentActualState = State.FAILED self.controller.componentExpectedState = State.STARTED - self.assertTrue(self.controller.componentActualState, State.INSTALLED) + self.assertTrue(self.controller.componentActualState, State.FAILED) self.controller.actionQueue.customServiceOrchestrator.stored_command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'HBASE', @@ -699,6 +701,23 @@ class TestController(unittest.TestCase): 'commandId': '8-1', 'auto_generated': True}])]) self.controller.config = original_value + + # restartEnabled = False + self.controller.componentActualState = State.FAILED + self.controller.heartbeatWithServer() + + self.assertTrue(sendRequest.called) + self.assertTrue(self.controller.componentActualState, State.FAILED) + self.assertTrue(self.controller.componentExpectedState, State.STARTED) + + # restartEnabled = True + self.controller.componentActualState = State.INSTALLED + self.controller.componentExpectedState = State.INSTALLED + self.controller.heartbeatWithServer() + + self.assertTrue(sendRequest.called) + self.assertTrue(self.controller.componentActualState, State.INSTALLED) + self.assertTrue(self.controller.componentExpectedState, State.INSTALLED) pass http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py index 6408809..e545afe 100644 --- a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py +++ b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py @@ -230,6 +230,8 @@ class TestCustomServiceOrchestrator(TestCase): self.assertEqual(ret['allocated_ports'], {'a.a.port': '10233'}) self.assertTrue(run_file_mock.called) self.assertEqual(run_file_mock.call_count, 1) + self.assertEqual(orchestrator.allocated_ports, {'a.a.port': '10233'}) + self.assertEqual(orchestrator.stored_command, {}) @patch.object(socket, "close") http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/test/python/agent/TestHeartbeat.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestHeartbeat.py b/slider-agent/src/test/python/agent/TestHeartbeat.py index 70870ec..b012218 100644 --- a/slider-agent/src/test/python/agent/TestHeartbeat.py +++ b/slider-agent/src/test/python/agent/TestHeartbeat.py @@ -50,7 +50,7 @@ class TestHeartbeat(TestCase): dummy_controller = MagicMock() actionQueue = ActionQueue(config, dummy_controller) heartbeat = Heartbeat(actionQueue, config) - result = heartbeat.build({}, State.STARTED, 100) + result = heartbeat.build({}, 100) print "Heartbeat: " + str(result) self.assertEquals(result['hostname'] != '', True, "hostname should not be empty") @@ -63,7 +63,6 @@ class TestHeartbeat(TestCase): self.assertEquals(len(result['nodeStatus']), 2) self.assertEquals(result['nodeStatus']['cause'], "NONE") self.assertEquals(result['nodeStatus']['status'], "HEALTHY") - self.assertEquals(result['agentState'], State.STARTED) # result may or may NOT have an agentEnv structure in it self.assertEquals((len(result) is 6) or (len(result) is 7), True) self.assertEquals(not heartbeat.reports, True, @@ -147,7 +146,7 @@ class TestHeartbeat(TestCase): } heartbeat = Heartbeat(actionQueue, config) # State.STARTED results in agentState to be set to 4 (enum order) - hb = heartbeat.build({}, State.STARTED, 10) + hb = heartbeat.build({}, 10) hb['hostname'] = 'hostname' hb['timestamp'] = 'timestamp' hb['fqdn'] = 'fqdn' @@ -155,7 +154,7 @@ class TestHeartbeat(TestCase): {'status': 'HEALTHY', 'cause': 'NONE'}, 'timestamp': 'timestamp', 'hostname': 'hostname', 'fqdn': 'fqdn', - 'agentState': 4, 'responseId': 10, 'reports': [ + 'responseId': 10, 'reports': [ {'status': 'IN_PROGRESS', 'roleCommand': u'INSTALL', 'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1', 'stderr': 'Read from /tmp/errors-3.txt', @@ -178,6 +177,81 @@ class TestHeartbeat(TestCase): self.assertEqual.__self__.maxDiff = None self.assertEquals(hb, expected) + @patch.object(ActionQueue, "result") + def test_build_result2(self, result_mock): + config = AgentConfig("", "") + config.set('agent', 'prefix', 'tmp') + dummy_controller = MagicMock() + actionQueue = ActionQueue(config, dummy_controller) + result_mock.return_value = { + 'reports': [{'status': 'IN_PROGRESS', + 'stderr': 'Read from /tmp/errors-3.txt', + 'stdout': 'Read from /tmp/output-3.txt', + 'clusterName': u'cc', + 'roleCommand': u'INSTALL', + 'serviceName': u'HDFS', + 'role': u'DATANODE', + 'actionId': '1-1', + 'taskId': 3, + 'exitcode': 777, + 'reportResult' : False} + ], + 'componentStatus': [] + } + heartbeat = Heartbeat(actionQueue, config) + + commandResult = {} + hb = heartbeat.build(commandResult, 10) + hb['hostname'] = 'hostname' + hb['timestamp'] = 'timestamp' + hb['fqdn'] = 'fqdn' + expected = {'nodeStatus': + {'status': 'HEALTHY', + 'cause': 'NONE'}, + 'timestamp': 'timestamp', 'hostname': 'hostname', 'fqdn': 'fqdn', + 'responseId': 10, 'reports': []} + self.assertEqual.__self__.maxDiff = None + self.assertEquals(hb, expected) + self.assertEquals(commandResult, {'commandStatus': 'IN_PROGRESS'}) + + @patch.object(ActionQueue, "result") + def test_build_result3(self, result_mock): + config = AgentConfig("", "") + config.set('agent', 'prefix', 'tmp') + dummy_controller = MagicMock() + actionQueue = ActionQueue(config, dummy_controller) + result_mock.return_value = { + 'reports': [{'status': 'COMPLETED', + 'stderr': 'Read from /tmp/errors-3.txt', + 'stdout': 'Read from /tmp/output-3.txt', + 'clusterName': u'cc', + 'roleCommand': u'INSTALL', + 'serviceName': u'HDFS', + 'role': u'DATANODE', + 'actionId': '1-1', + 'taskId': 3, + 'exitcode': 777, + 'reportResult' : False} + ], + 'componentStatus': [] + } + heartbeat = Heartbeat(actionQueue, config) + + commandResult = {} + hb = heartbeat.build(commandResult, 10) + hb['hostname'] = 'hostname' + hb['timestamp'] = 'timestamp' + hb['fqdn'] = 'fqdn' + expected = {'nodeStatus': + {'status': 'HEALTHY', + 'cause': 'NONE'}, + 'timestamp': 'timestamp', 'hostname': 'hostname', 'fqdn': 'fqdn', + 'responseId': 10, 'reports': []} + self.assertEqual.__self__.maxDiff = None + self.assertEquals(hb, expected) + self.assertEquals(commandResult, {'commandStatus': 'COMPLETED'}) + + if __name__ == "__main__": logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/test/python/agent/TestRegistration.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestRegistration.py b/slider-agent/src/test/python/agent/TestRegistration.py index 5245af9..f91fe29 100644 --- a/slider-agent/src/test/python/agent/TestRegistration.py +++ b/slider-agent/src/test/python/agent/TestRegistration.py @@ -26,6 +26,7 @@ import tempfile from mock.mock import patch from mock.mock import MagicMock from Register import Register +from Controller import State from AgentConfig import AgentConfig class TestRegistration(TestCase): @@ -47,14 +48,17 @@ class TestRegistration(TestCase): text_file.write("1.3.0") register = Register(config) - data = register.build(1) + data = register.build(State.INIT, State.INIT, {}, 1) #print ("Register: " + pprint.pformat(data)) self.assertEquals(data['hostname'] != "", True, "hostname should not be empty") self.assertEquals(data['publicHostname'] != "", True, "publicHostname should not be empty") self.assertEquals(data['responseId'], 1) self.assertEquals(data['timestamp'] > 1353678475465L, True, "timestamp should not be empty") self.assertEquals(data['agentVersion'], '1.3.0', "agentVersion should not be empty") - self.assertEquals(len(data), 5) + self.assertEquals(data['actualState'], State.INIT, "actualState should not be empty") + self.assertEquals(data['expectedState'], State.INIT, "expectedState should not be empty") + self.assertEquals(data['allocatedPorts'], {}, "allocatedPorts should be empty") + self.assertEquals(len(data), 8) self.assertEquals(os.path.join(tmpdir, "app/definition"), config.getResolvedPath("app_pkg_dir")) self.assertEquals(os.path.join(tmpdir, "app/install"), config.getResolvedPath("app_install_dir")) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index b22ef39..8ce1f18 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -398,9 +398,19 @@ public class AgentProviderService extends AbstractProviderService implements log.info("Handling registration: " + registration); RegistrationResponse response = new RegistrationResponse(); String label = registration.getHostname(); + State agentState = registration.getActualState(); if (getComponentStatuses().containsKey(label)) { response.setResponseStatus(RegistrationStatus.OK); - getComponentStatuses().get(label).heartbeat(System.currentTimeMillis()); + ComponentInstanceState componentStatus = getComponentStatuses().get(label); + componentStatus.heartbeat(System.currentTimeMillis()); + updateComponentStatusWithAgentState(componentStatus, agentState); + + Map<String, String> ports = registration.getAllocatedPorts(); + if (ports != null && !ports.isEmpty()) { + String roleName = getRoleName(label); + String containerId = getContainerId(label); + processAllocatedPorts(registration.getPublicHostname(), roleName, containerId, ports); + } } else { response.setResponseStatus(RegistrationStatus.FAILED); response.setLog("Label not recognized."); @@ -424,9 +434,8 @@ public class AgentProviderService extends AbstractProviderService implements String label = heartBeat.getHostname(); String roleName = getRoleName(label); - State agentState = heartBeat.getAgentState(); - String containerId = getContainerId(label); + StateAccessForProviders accessor = getAmState(); String scriptPath = getScriptPathFromMetainfo(roleName); @@ -441,13 +450,22 @@ public class AgentProviderService extends AbstractProviderService implements Boolean isMaster = isMaster(roleName); ComponentInstanceState componentStatus = getComponentStatuses().get(label); - updateComponentStatusWithAgentState(componentStatus, agentState); componentStatus.heartbeat(System.currentTimeMillis()); // If no Master can explicitly publish then publish if its a master // Otherwise, wait till the master that can publish is ready if (isMaster && (canAnyMasterPublishConfig() == false || canPublishConfig(roleName))) { publishConfigAndExportGroups(heartBeat, componentStatus); + } else { + // Ack that config has been reported + List<ComponentStatus> statuses = heartBeat.getComponentStatus(); + if (statuses != null && !statuses.isEmpty()) { + ComponentStatus status = statuses.get(0); + if(status.getConfigs().size() > 0) { + log.info("Config got reported by {} but discarded as component {} cannot publish.", label, roleName); + componentStatus.setConfigReported(true); + } + } } List<CommandReport> reports = heartBeat.getReports(); @@ -455,14 +473,7 @@ public class AgentProviderService extends AbstractProviderService implements CommandReport report = reports.get(0); Map<String, String> ports = report.getAllocatedPorts(); if (ports != null && !ports.isEmpty()) { - for (Map.Entry<String, String> port : ports.entrySet()) { - log.info("Recording allocated port for {} as {}", port.getKey(), port.getValue()); - this.getAllocatedPorts().put(port.getKey(), port.getValue()); - this.getAllocatedPorts(containerId).put(port.getKey(), port.getValue()); - } - - // component specific publishes - processAndPublishComponentSpecificData(ports, containerId, heartBeat.getFqdn(), roleName); + processAllocatedPorts(heartBeat.getFqdn(), roleName, containerId, ports); } CommandResult result = CommandResult.getCommandResult(report.getStatus()); Command command = Command.getCommand(report.getRoleCommand()); @@ -506,6 +517,7 @@ public class AgentProviderService extends AbstractProviderService implements if (isMaster && componentStatus.getState() == State.STARTED && command == Command.NOP) { if (!componentStatus.getConfigReported()) { + log.info("Requesting applied config for {} on {}.", roleName, containerId); addGetConfigCommand(roleName, containerId, response); } } @@ -525,6 +537,20 @@ public class AgentProviderService extends AbstractProviderService implements return response; } + protected void processAllocatedPorts(String fqdn, + String roleName, + String containerId, + Map<String, String> ports) { + for (Map.Entry<String, String> port : ports.entrySet()) { + log.info("Recording allocated port for {} as {}", port.getKey(), port.getValue()); + this.getAllocatedPorts().put(port.getKey(), port.getValue()); + this.getAllocatedPorts(containerId).put(port.getKey(), port.getValue()); + } + + // component specific publishes + processAndPublishComponentSpecificData(ports, containerId, fqdn, roleName); + } + private void updateComponentStatusWithAgentState( ComponentInstanceState componentStatus, State agentState) { if (agentState != null) { @@ -803,6 +829,7 @@ public class AgentProviderService extends AbstractProviderService implements } } } + log.info("Received and stored config for {}", heartBeat.getHostname()); componentStatus.setConfigReported(true); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java index 62df18d..a08d46d 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java @@ -45,7 +45,6 @@ public class HeartBeat { HostStatus nodeStatus; private AgentEnv agentEnv = null; private String fqdn; - private State agentState; public long getResponseId() { return responseId; @@ -125,14 +124,6 @@ public class HeartBeat { this.mounts = mounts; } - public State getAgentState() { - return agentState; - } - - public void setAgentState(State agentState) { - this.agentState = agentState; - } - @Override public String toString() { return "HeartBeat{" + @@ -142,7 +133,6 @@ public class HeartBeat { ", reports=" + reports + ", componentStatus=" + componentStatus + ", nodeStatus=" + nodeStatus + - ", agentState=" + agentState + '}'; } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java index 9299a16..a44c3a4 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java @@ -16,15 +16,14 @@ */ package org.apache.slider.server.appmaster.web.rest.agent; +import org.apache.slider.providers.agent.State; import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.annotate.JsonSerialize; -/** - * - * Data model for agent to send heartbeat to ambari and/or app master. - * - */ +import java.util.Map; + +/** Data model for agent to send heartbeat to ambari and/or app master. */ @JsonIgnoreProperties(ignoreUnknown = true) @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) public class Register { @@ -36,6 +35,9 @@ public class Register { private String publicHostname; private AgentEnv agentEnv; private String agentVersion; + private State actualState; + private State expectedState; + private Map<String, String> allocatedPorts; @JsonProperty("responseId") public int getResponseId() { @@ -44,13 +46,17 @@ public class Register { @JsonProperty("responseId") public void setResponseId(int responseId) { - this.responseId=responseId; + this.responseId = responseId; } public long getTimestamp() { return timestamp; } + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + public String getHostname() { return hostname; } @@ -67,10 +73,6 @@ public class Register { this.hardwareProfile = hardwareProfile; } - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - public String getPublicHostname() { return publicHostname; } @@ -103,15 +105,45 @@ public class Register { this.currentPingPort = currentPingPort; } + public State getActualState() { + return actualState; + } + + public void setActualState(State actualState) { + this.actualState = actualState; + } + + public State getExpectedState() { + return expectedState; + } + + public void setExpectedState(State expectedState) { + this.expectedState = expectedState; + } + + /** @return the allocated ports, or <code>null</code> if none are present */ + @JsonProperty("allocatedPorts") + public Map<String, String> getAllocatedPorts() { + return allocatedPorts; + } + + /** @param ports allocated ports */ + @JsonProperty("allocatedPorts") + public void setAllocatedPorts(Map<String, String> ports) { + this.allocatedPorts = ports; + } + @Override public String toString() { String ret = "responseId=" + responseId + "\n" + "timestamp=" + timestamp + "\n" + - "hostname=" + hostname + "\n" + - "currentPingPort=" + currentPingPort + "\n"; + "hostname=" + hostname + "\n" + + "expectedState=" + expectedState + "\n" + + "actualState=" + actualState + "\n"; - if (hardwareProfile != null) + if (hardwareProfile != null) { ret = ret + "hardwareprofile=" + this.hardwareProfile.toString(); + } return ret; } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java index 1aeedbc..20c16ff 100644 --- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java +++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java @@ -87,6 +87,7 @@ import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -271,6 +272,12 @@ public class TestAgentProviderService { } catch (SliderException e) { } + doNothing().when(mockAps).processAllocatedPorts( + anyString(), + anyString(), + anyString(), + anyMap() + ); expect(access.isApplicationLive()).andReturn(true).anyTimes(); ClusterDescription desc = new ClusterDescription(); desc.setOption(OptionKeys.ZOOKEEPER_QUORUM, "host1:2181"); @@ -305,10 +312,20 @@ public class TestAgentProviderService { Register reg = new Register(); reg.setResponseId(0); reg.setHostname("mockcontainer_1___HBASE_MASTER"); + Map<String,String> ports = new HashMap(); + ports.put("a","100"); + reg.setAllocatedPorts(ports); RegistrationResponse resp = mockAps.handleRegistration(reg); Assert.assertEquals(0, resp.getResponseId()); Assert.assertEquals(RegistrationStatus.OK, resp.getResponseStatus()); + Mockito.verify(mockAps, Mockito.times(1)).processAllocatedPorts( + anyString(), + anyString(), + anyString(), + anyMap() + ); + HeartBeat hb = new HeartBeat(); hb.setResponseId(1); hb.setHostname("mockcontainer_1___HBASE_MASTER");