SLIDER-262. Slider agent should provide process supervision such as auto-restart
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/506eb79e Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/506eb79e Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/506eb79e Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry Commit: 506eb79e35ff6187648f4d874c22bb3fe3b18e58 Parents: 004fc8d Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Tue Aug 12 18:38:15 2014 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Thu Aug 14 17:56:52 2014 -0700 ---------------------------------------------------------------------- .../src/main/python/agent/ActionQueue.py | 22 +++++++--- slider-agent/src/main/python/agent/Constants.py | 1 + .../src/main/python/agent/Controller.py | 25 ++++++++++- .../python/agent/CustomServiceOrchestrator.py | 44 +++++++++++--------- slider-agent/src/main/python/agent/Heartbeat.py | 6 ++- .../src/test/python/agent/TestActionQueue.py | 5 ++- .../agent/TestCustomServiceOrchestrator.py | 8 ++-- .../src/test/python/agent/TestHeartbeat.py | 25 +++++++++-- .../slider/common/tools/CoreFileSystem.java | 3 ++ .../providers/agent/AgentProviderService.java | 35 +++++++++++++++- .../web/rest/agent/HeartBeatResponse.java | 11 +++++ .../agent/TestAgentProviderService.java | 21 ++++++---- 12 files changed, 162 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-agent/src/main/python/agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/ActionQueue.py b/slider-agent/src/main/python/agent/ActionQueue.py index d4d8bc2..774d1e6 100644 --- a/slider-agent/src/main/python/agent/ActionQueue.py +++ b/slider-agent/src/main/python/agent/ActionQueue.py @@ -49,6 +49,7 @@ class ActionQueue(threading.Thread): FAILED_STATUS = 'FAILED' STORE_APPLIED_CONFIG = 'record_config' + AUTO_RESTART = 'auto_restart' def __init__(self, config, controller): super(ActionQueue, self).__init__() @@ -120,6 +121,9 @@ class ActionQueue(threading.Thread): logger.debug(pprint.pformat(command)) taskId = command['taskId'] + + reportResult = not command[Constants.AUTO_GENERATED] + # Preparing 'IN_PROGRESS' report in_progress_status = self.commandStatuses.generate_report_template(command) in_progress_status.update({ @@ -127,12 +131,17 @@ class ActionQueue(threading.Thread): 'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt', 'structuredOut': self.tmpdir + os.sep + 'structured-out-' + str( taskId) + '.json', - 'status': self.IN_PROGRESS_STATUS + 'status': self.IN_PROGRESS_STATUS, + 'reportResult': reportResult }) - self.commandStatuses.put_command_status(command, in_progress_status) + self.commandStatuses.put_command_status(command, in_progress_status, reportResult) store_config = False if ActionQueue.STORE_APPLIED_CONFIG in command['commandParams']: store_config = 'true' == command['commandParams'][ActionQueue.STORE_APPLIED_CONFIG] + store_command = False + if ActionQueue.AUTO_RESTART in command['roleParams']: + store_command = 'true' == command['roleParams'][ActionQueue.AUTO_RESTART] + # running command commandresult = self.customServiceOrchestrator.runCommand(command, @@ -141,7 +150,7 @@ class ActionQueue(threading.Thread): in_progress_status[ 'tmperr'], True, - store_config) + store_config or store_command) # dumping results status = self.COMPLETED_STATUS if commandresult[Constants.EXIT_CODE] != 0: @@ -152,6 +161,7 @@ class ActionQueue(threading.Thread): 'stderr': commandresult['stderr'], Constants.EXIT_CODE: commandresult[Constants.EXIT_CODE], 'status': status, + 'reportResult': reportResult }) if roleResult['stdout'] == '': roleResult['stdout'] = 'None' @@ -170,7 +180,7 @@ class ActionQueue(threading.Thread): roleResult['allocatedPorts'] = commandresult[Constants.ALLOCATED_PORTS] if Constants.FOLDERS in commandresult: roleResult['folders'] = commandresult[Constants.FOLDERS] - self.commandStatuses.put_command_status(command, roleResult) + self.commandStatuses.put_command_status(command, roleResult, reportResult) # Store action result to agent response queue def result(self): @@ -185,8 +195,8 @@ class ActionQueue(threading.Thread): service = command['serviceName'] component = command['componentName'] reportResult = True - if 'auto_generated' in command: - reportResult = not command['auto_generated'] + if Constants.AUTO_GENERATED in command: + reportResult = not command[Constants.AUTO_GENERATED] component_status = self.customServiceOrchestrator.requestComponentStatus(command) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-agent/src/main/python/agent/Constants.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Constants.py b/slider-agent/src/main/python/agent/Constants.py index a120999..2975266 100644 --- a/slider-agent/src/main/python/agent/Constants.py +++ b/slider-agent/src/main/python/agent/Constants.py @@ -32,3 +32,4 @@ DO_NOT_HEARTBEAT = "DO_NOT_HEARTBEAT" DO_NOT_HEARTBEAT_AFTER_ = "DO_NOT_HEARTBEAT_AFTER_" ZK_QUORUM="zk_quorum" ZK_REG_PATH="zk_reg_path" +AUTO_GENERATED="auto_generated" http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index 1a01bf8..77dcafb 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -86,6 +86,7 @@ class Controller(threading.Thread): self.statusCommand = None self.failureCount = 0 self.heartBeatRetryCount = 0 + self.autoRestart = False def __del__(self): @@ -224,6 +225,9 @@ class Controller(threading.Thread): serverId = int(response['responseId']) + if 'restartEnabled' in response: + restartEnabled = 'true' == response['restartEnabled'] + if 'hasMappedComponents' in response.keys(): self.hasMappedComponents = response['hasMappedComponents'] != False @@ -257,6 +261,17 @@ class Controller(threading.Thread): logger.info("No commands sent from the Server.") pass + # Add a start command + if self.componentActualState == State.INSTALLED and \ + self.componentExpectedState == State.STARTED and restartEnabled: + stored_command = self.actionQueue.customServiceOrchestrator.stored_command + if len(stored_command) > 0: + auto_start_command = self.create_start_command(stored_command) + if auto_start_command: + self.updateStateBasedOnCommand([auto_start_command]) + self.addToQueue([auto_start_command]) + pass + # Add a status command if (self.componentActualState != State.STARTING and \ self.componentExpectedState == State.STARTED) and \ @@ -328,6 +343,14 @@ class Controller(threading.Thread): pass logger.info("Controller stopped heart-beating.") + def create_start_command(self, stored_command): + taskId = int(stored_command['taskId']) + taskId = taskId + 1 + stored_command['taskId'] = taskId + stored_command['commandId'] = "{0}-1".format(taskId) + stored_command[Constants.AUTO_GENERATED] = True + pass + def updateStateBasedOnCommand(self, commands): for command in commands: if command["roleCommand"] == "START": @@ -388,7 +411,7 @@ class Controller(threading.Thread): statusCommand["hostLevelParams"] = command["hostLevelParams"] statusCommand["serviceName"] = command["serviceName"] statusCommand["taskId"] = "status" - statusCommand['auto_generated'] = True + statusCommand[Constants.AUTO_GENERATED] = True logger.info("Status command: " + pprint.pformat(statusCommand)) return statusCommand pass http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py index 0078c9c..6b2ace5 100644 --- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py +++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py @@ -56,7 +56,7 @@ class CustomServiceOrchestrator(): self.status_commands_stderr = os.path.realpath(posixpath.join(self.tmp_dir, 'status_command_stderr.txt')) self.public_fqdn = hostname.public_hostname() - self.applied_configs = {} + self.stored_command = {} # Clean up old status command files if any try: os.unlink(self.status_commands_stdout) @@ -68,7 +68,7 @@ class CustomServiceOrchestrator(): def runCommand(self, command, tmpoutfile, tmperrfile, - override_output_files=True, store_config=False): + override_output_files=True, store_command=False): allocated_port = {} try: script_type = command['commandParams']['script_type'] @@ -86,7 +86,7 @@ class CustomServiceOrchestrator(): # We don't support anything else yet message = "Unknown script type {0}".format(script_type) raise AgentException(message) - json_path = self.dump_command_to_json(command, allocated_port, store_config) + json_path = self.dump_command_to_json(command, allocated_port, store_command) py_file_list = [script_tuple] # filter None values filtered_py_file_list = [i for i in py_file_list if i] @@ -154,22 +154,28 @@ class CustomServiceOrchestrator(): return path def getConfig(self, command): - if 'commandParams' in command and 'config_type' in command['commandParams']: - config_type = command['commandParams']['config_type'] - logger.info("Requesting applied config for type {0}".format(config_type)) - if config_type in self.applied_configs: - return { - 'configurations': {config_type: self.applied_configs[config_type]} - } + if 'configurations' in self.stored_command: + if 'commandParams' in command and 'config_type' in command['commandParams']: + config_type = command['commandParams']['config_type'] + logger.info("Requesting applied config for type {0}".format(config_type)) + if config_type in self.stored_command['configurations']: + return { + 'configurations': {config_type: self.stored_command['configurations'][config_type]} + } + else: + return { + 'configurations': {} + } + pass else: + logger.info("Requesting all applied config.") return { - 'configurations': {} + 'configurations': self.stored_command['configurations'] } pass else: - logger.info("Requesting all applied config.") return { - 'configurations': self.applied_configs + 'configurations': {} } pass @@ -184,7 +190,7 @@ class CustomServiceOrchestrator(): override_output_files = False if command['roleCommand'] == "GET_CONFIG": - return self.getConfig(command) + return self.getConfig(command) else: res = self.runCommand(command, self.status_commands_stdout, @@ -198,7 +204,7 @@ class CustomServiceOrchestrator(): return res pass - def dump_command_to_json(self, command, allocated_ports, store_config=False): + def dump_command_to_json(self, command, allocated_ports, store_command=False): """ Converts command to json file and returns file path """ @@ -221,7 +227,7 @@ class CustomServiceOrchestrator(): if os.path.isfile(file_path): os.unlink(file_path) - self.finalize_command(command, store_config, allocated_ports) + self.finalize_command(command, store_command, allocated_ports) with os.fdopen(os.open(file_path, os.O_WRONLY | os.O_CREAT, 0600), 'w') as f: @@ -238,7 +244,7 @@ class CustomServiceOrchestrator(): Either a port gets allocated or if not then just set the value to "0" """ - def finalize_command(self, command, store_config, allocated_ports): + def finalize_command(self, command, store_command, allocated_ports): component = command['componentName'] allocated_for_this_component_format = "${{{0}.ALLOCATED_PORT}}" allocated_for_any = ".ALLOCATED_PORT}" @@ -267,9 +273,9 @@ class CustomServiceOrchestrator(): pass pass - if store_config: + if store_command: logger.info("Storing applied config: " + pprint.pformat(command['configurations'])) - self.applied_configs = command['configurations'] + self.stored_command = command pass http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-agent/src/main/python/agent/Heartbeat.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Heartbeat.py b/slider-agent/src/main/python/agent/Heartbeat.py index 4f2207c..aa403d4 100644 --- a/slider-agent/src/main/python/agent/Heartbeat.py +++ b/slider-agent/src/main/python/agent/Heartbeat.py @@ -57,7 +57,11 @@ class Heartbeat: if not self.actionQueue.commandQueue.empty(): commandsInProgress = True if len(queueResult) != 0: - heartbeat['reports'] = queueResult['reports'] + heartbeat['reports'] = [] + for report in queueResult['reports']: + if report['reportResult']: + del report['reportResult'] + heartbeat['reports'].append(report) if len(heartbeat['reports']) > 0: # There may be IN_PROGRESS tasks commandsInProgress = True http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-agent/src/test/python/agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestActionQueue.py b/slider-agent/src/test/python/agent/TestActionQueue.py index 21cc5dd..f0935a9 100644 --- a/slider-agent/src/test/python/agent/TestActionQueue.py +++ b/slider-agent/src/test/python/agent/TestActionQueue.py @@ -61,6 +61,8 @@ class TestActionQueue(TestCase): 'serviceName': u'HBASE', 'configurations': {'global': {}}, 'configurationTags': {'global': {'tag': 'v1'}}, + 'auto_generated': False, + 'roleParams': {'auto_restart':'false'}, 'commandParams': {'script_type': 'PYTHON', 'script': 'scripts/abc.py', 'command_timeout': '600'} @@ -365,7 +367,8 @@ class TestActionQueue(TestCase): 'structuredOut': '', 'exitcode': 0, 'allocatedPorts': {}, - 'folders': {'AGENT_LOG_ROOT': tempdir, 'AGENT_WORK_ROOT': tempdir}} + 'folders': {'AGENT_LOG_ROOT': tempdir, 'AGENT_WORK_ROOT': tempdir}, + 'reportResult': True} self.assertEqual(len(report['reports']), 1) self.assertEqual(report['reports'][0], expected) self.assertTrue(os.path.isfile(configname)) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py index 48e4df4..6408809 100644 --- a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py +++ b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py @@ -38,6 +38,7 @@ from socket import socket class TestCustomServiceOrchestrator(TestCase): + def setUp(self): # disable stdout out = StringIO.StringIO() @@ -415,9 +416,10 @@ class TestCustomServiceOrchestrator(TestCase): } ret = orchestrator.runCommand(command, "out.txt", "err.txt", True, True) + self.assertEqual.__self__.maxDiff = None self.assertEqual(ret['exitcode'], 0) self.assertTrue(run_file_mock.called) - self.assertEqual(orchestrator.applied_configs, expected) + self.assertEqual(orchestrator.stored_command, command) ret = orchestrator.requestComponentStatus(command_get) self.assertEqual(ret['configurations'], expected) @@ -498,7 +500,7 @@ class TestCustomServiceOrchestrator(TestCase): self.assertEqual(command['configurations']['oozie-site']['ignore_port2'], "[0,0]") self.assertEqual(command['configurations']['oozie-site']['ignore_port3'], "[0,0,0]") self.assertEqual(command['configurations']['oozie-site']['ignore_port4'], "${HBASE_RS}{a}{b}{c}") - self.assertEqual(orchestrator.applied_configs, {}) + self.assertEqual(orchestrator.stored_command, {}) self.assertEqual(len(allocated_ports), 1) self.assertTrue('oozie-site.a_port' in allocated_ports) self.assertEqual(allocated_ports['oozie-site.a_port'], '10023') @@ -513,7 +515,7 @@ class TestCustomServiceOrchestrator(TestCase): self.assertEqual(command['configurations']['hbase-site']['log_root'], tempdir + "/log") self.assertEqual(command['configurations']['hbase-site']['blog_root'], "/b/" + tempdir + "/log") self.assertEqual(command['configurations']['oozie-site']['b_port'], "0") - self.assertEqual(orchestrator.applied_configs, command['configurations']) + self.assertEqual(orchestrator.stored_command, command) def test_port_allocation(self): http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-agent/src/test/python/agent/TestHeartbeat.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestHeartbeat.py b/slider-agent/src/test/python/agent/TestHeartbeat.py index a9da65d..70870ec 100644 --- a/slider-agent/src/test/python/agent/TestHeartbeat.py +++ b/slider-agent/src/test/python/agent/TestHeartbeat.py @@ -86,7 +86,8 @@ class TestHeartbeat(TestCase): 'role': u'DATANODE', 'actionId': '1-1', 'taskId': 3, - 'exitcode': 777}, + 'exitcode': 777, + 'reportResult' : True}, {'status': 'COMPLETED', 'stderr': 'stderr', @@ -97,7 +98,8 @@ class TestHeartbeat(TestCase): 'role': 'role', 'actionId': 17, 'taskId': 'taskId', - 'exitcode': 0}, + 'exitcode': 0, + 'reportResult' : True}, {'status': 'FAILED', 'stderr': 'stderr', @@ -108,7 +110,8 @@ class TestHeartbeat(TestCase): 'role': u'DATANODE', 'actionId': '1-1', 'taskId': 3, - 'exitcode': 13}, + 'exitcode': 13, + 'reportResult' : True}, {'status': 'COMPLETED', 'stderr': 'stderr', @@ -120,8 +123,21 @@ class TestHeartbeat(TestCase): 'role': u'DATANODE', 'actionId': '1-1', 'taskId': 3, - 'exitcode': 0} + 'exitcode': 0, + 'reportResult' : True}, + {'status': 'COMPLETED', + 'stderr': 'stderr', + 'stdout': 'out', + 'clusterName': u'cc', + 'configurationTags': {'global': {'tag': 'v1'}}, + 'roleCommand': u'INSTALL', + 'serviceName': u'HDFS', + 'role': u'DATANODE', + 'actionId': '1-1', + 'taskId': 3, + 'exitcode': 0, + 'reportResult' : False} ], 'componentStatus': [ {'status': 'HEALTHY', 'componentName': 'DATANODE', 'reportResult' : True}, @@ -159,6 +175,7 @@ class TestHeartbeat(TestCase): 'stderr': 'stderr'}], 'componentStatus': [ {'status': 'HEALTHY', 'componentName': 'DATANODE'}, {'status': 'UNHEALTHY', 'componentName': 'NAMENODE'}]} + self.assertEqual.__self__.maxDiff = None self.assertEquals(hb, expected) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java b/slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java index a145b8f..def252a 100644 --- a/slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java +++ b/slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java @@ -50,6 +50,9 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; +import java.util.Enumeration; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; import static org.apache.slider.common.SliderXmlConfKeys.CLUSTER_DIRECTORY_PERMISSIONS; import static org.apache.slider.common.SliderXmlConfKeys.DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index 559c4e7..a40926a 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -495,7 +495,7 @@ public class AgentProviderService extends AbstractProviderService implements boolean canExecute = commandOrder.canExecute(roleName, command, getComponentStatuses().values()); if (canExecute) { log.info("Starting {} on {}.", roleName, containerId); - addStartCommand(roleName, containerId, response, scriptPath); + addStartCommand(roleName, containerId, response, scriptPath, isMarkedAutoRestart(roleName)); componentStatus.commandIssued(command); } else { log.info("Start of {} on {} delayed as dependencies have not started.", roleName, containerId); @@ -509,6 +509,13 @@ public class AgentProviderService extends AbstractProviderService implements addGetConfigCommand(roleName, containerId, response); } } + + // if restart is required then signal + response.setRestartEnabled(false); + if (componentStatus.getState() == State.STARTED + && command == Command.NOP && isMarkedAutoRestart(roleName)) { + response.setRestartEnabled(true); + } } catch (SliderException e) { componentStatus.applyCommandResult(CommandResult.FAILED, command); log.warn("Component instance failed operation.", e); @@ -936,6 +943,25 @@ public class AgentProviderService extends AbstractProviderService implements } /** + * Checks if the role is marked auto-restart + * @param roleName + * @return + */ + protected boolean isMarkedAutoRestart(String roleName) { + Application application = getMetainfo().getApplication(); + if (application == null) { + log.error("Malformed app definition: Expect application as the top level element for metainfo.xml"); + } else { + for (Component component : application.getComponents()) { + if (component.getName().equals(roleName)) { + return component.getRequiresAutoRestart(); + } + } + } + return false; + } + + /** * Can any master publish config explicitly, if not a random master is used * @return */ @@ -1104,7 +1130,8 @@ public class AgentProviderService extends AbstractProviderService implements } @VisibleForTesting - protected void addStartCommand(String roleName, String containerId, HeartBeatResponse response, String scriptPath) + protected void addStartCommand(String roleName, String containerId, HeartBeatResponse response, + String scriptPath, Boolean isMarkedAutoRestart) throws SliderException { assert getAmState().isApplicationLive(); @@ -1126,6 +1153,10 @@ public class AgentProviderService extends AbstractProviderService implements hostLevelParams.put(CONTAINER_ID, containerId); cmd.setHostLevelParams(hostLevelParams); + Map<String, String> roleParams = new TreeMap<>(); + cmd.setRoleParams(roleParams); + cmd.getRoleParams().put("auto_restart", Boolean.toString(isMarkedAutoRestart)); + cmd.setCommandParams(setCommandParameters(scriptPath, true)); Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java index ca2db32..0545499 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java @@ -40,6 +40,7 @@ public class HeartBeatResponse { RegistrationCommand registrationCommand; boolean restartAgent = false; + boolean restartEnabled = true; boolean hasMappedComponents = false; @JsonProperty("responseId") @@ -92,6 +93,16 @@ public class HeartBeatResponse { this.restartAgent = restartAgent; } + @JsonProperty("restartEnabled") + public boolean getRstartEnabled() { + return restartEnabled; + } + + @JsonProperty("restartEnabled") + public void setRestartEnabled(boolean restartEnabled) { + this.restartEnabled = restartEnabled; + } + @JsonProperty("hasMappedComponents") public boolean hasMappedComponents() { return hasMappedComponents; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/506eb79e/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java index a68e770..463799c 100644 --- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java +++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java @@ -78,6 +78,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.easymock.EasyMock.anyBoolean; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.expect; @@ -642,7 +643,8 @@ public class TestAgentProviderService { anyString(), anyString(), any(HeartBeatResponse.class), - anyString()); + anyString(), + anyBoolean()); doNothing().when(mockAps).addGetConfigCommand( anyString(), anyString(), @@ -744,7 +746,8 @@ public class TestAgentProviderService { Mockito.verify(mockAps, Mockito.times(0)).addStartCommand(anyString(), anyString(), any(HeartBeatResponse.class), - anyString()); + anyString(), + anyBoolean()); // RS still does not start hb = new HeartBeat(); hb.setResponseId(3); @@ -754,7 +757,8 @@ public class TestAgentProviderService { Mockito.verify(mockAps, Mockito.times(0)).addStartCommand(anyString(), anyString(), any(HeartBeatResponse.class), - anyString()); + anyString(), + anyBoolean()); // MASTER succeeds install and issues start hb = new HeartBeat(); @@ -773,7 +777,8 @@ public class TestAgentProviderService { Mockito.verify(mockAps, Mockito.times(1)).addStartCommand(anyString(), anyString(), any(HeartBeatResponse.class), - anyString()); + anyString(), + anyBoolean()); Map<String, String> allocatedPorts = mockAps.getAllocatedPorts(); Assert.assertTrue(allocatedPorts != null); Assert.assertTrue(allocatedPorts.size() == 1); @@ -788,7 +793,8 @@ public class TestAgentProviderService { Mockito.verify(mockAps, Mockito.times(1)).addStartCommand(anyString(), anyString(), any(HeartBeatResponse.class), - anyString()); + anyString(), + anyBoolean()); // MASTER succeeds start hb = new HeartBeat(); hb.setResponseId(3); @@ -812,7 +818,8 @@ public class TestAgentProviderService { Mockito.verify(mockAps, Mockito.times(2)).addStartCommand(anyString(), anyString(), any(HeartBeatResponse.class), - anyString()); + anyString(), + anyBoolean()); // JDK7 } catch (SliderException he) { log.warn(he.getMessage()); @@ -959,7 +966,7 @@ public class TestAgentProviderService { replay(access); - mockAps.addStartCommand("HBASE_MASTER", "cid1", hbr, ""); + mockAps.addStartCommand("HBASE_MASTER", "cid1", hbr, "", Boolean.FALSE); Assert.assertTrue(hbr.getExecutionCommands().get(0).getConfigurations().containsKey("hbase-site")); Map<String, String> hbaseSiteConf = hbr.getExecutionCommands().get(0).getConfigurations().get("hbase-site"); Assert.assertTrue(hbaseSiteConf.containsKey("a.port"));