Repository: incubator-slider Updated Branches: refs/heads/feature/SLIDER-181_Storm_Package ee8bf826b -> b36c56acf
SLIDER-181. Slider support for Storm application package SLIDER-262. Slider agent should provide process supervision such as auto-restart SLIDER-123. Status check for Storm on YARN needs to be robust Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/b36c56ac Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/b36c56ac Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/b36c56ac Branch: refs/heads/feature/SLIDER-181_Storm_Package Commit: b36c56acf8b61fca3e70bba635a74e641209e70c Parents: ee8bf82 Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Mon Aug 11 09:54:13 2014 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Mon Aug 11 09:54:15 2014 -0700 ---------------------------------------------------------------------- app-packages/storm/appConfig.json | 2 +- app-packages/storm/metainfo.xml | 5 ++++ app-packages/storm/package/scripts/service.py | 24 ++++++++------- .../storm/package/scripts/status_params.py | 1 + .../src/main/python/agent/Controller.py | 3 +- .../python/agent/CustomServiceOrchestrator.py | 31 +++++++++++++++++++- .../agent/TestCustomServiceOrchestrator.py | 8 +++++ .../java/org/apache/slider/api/OptionKeys.java | 8 +---- .../providers/agent/AgentProviderService.java | 24 +++++++++------ .../agent/application/metadata/Component.java | 13 ++++++++ .../application/metadata/MetainfoParser.java | 1 + .../agent/TestAgentProviderService.java | 6 ++++ 12 files changed, 96 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/app-packages/storm/appConfig.json ---------------------------------------------------------------------- diff --git a/app-packages/storm/appConfig.json b/app-packages/storm/appConfig.json index 7f31030..24078cf 100644 --- a/app-packages/storm/appConfig.json +++ b/app-packages/storm/appConfig.json @@ -105,7 +105,7 @@ "site.storm-site.topology.trident.batch.emit.interval.millis": "500", "site.storm-site.topology.builtin.metrics.bucket.size.secs": "60", "site.storm-site.storm.thrift.transport": "backtype.storm.security.auth.SimpleTransportPlugin", - "site.storm-site.logviewer.port": "${SUPERVISOR.ALLOCATED_PORT}.{DO_NOT_PROPAGATE}", + "site.storm-site.logviewer.port": "${SUPERVISOR.ALLOCATED_PORT}{DO_NOT_PROPAGATE}", "site.storm-site.topology.debug": "false" }, "components": { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/app-packages/storm/metainfo.xml ---------------------------------------------------------------------- diff --git a/app-packages/storm/metainfo.xml b/app-packages/storm/metainfo.xml index fd16e61..e94bc62 100644 --- a/app-packages/storm/metainfo.xml +++ b/app-packages/storm/metainfo.xml @@ -77,6 +77,7 @@ <component> <name>NIMBUS</name> <category>MASTER</category> + <autoStartOnFailure>true</autoStartOnFailure> <commandScript> <script>scripts/nimbus.py</script> <scriptType>PYTHON</scriptType> @@ -87,6 +88,7 @@ <component> <name>STORM_REST_API</name> <category>MASTER</category> + <autoStartOnFailure>true</autoStartOnFailure> <commandScript> <script>scripts/rest_api.py</script> <scriptType>PYTHON</scriptType> @@ -97,6 +99,7 @@ <component> <name>SUPERVISOR</name> <category>SLAVE</category> + <autoStartOnFailure>true</autoStartOnFailure> <exports> <export> <name>log_viewer_port</name> @@ -114,6 +117,7 @@ <name>STORM_UI_SERVER</name> <category>MASTER</category> <publishConfig>true</publishConfig> + <autoStartOnFailure>true</autoStartOnFailure> <commandScript> <script>scripts/ui_server.py</script> <scriptType>PYTHON</scriptType> @@ -124,6 +128,7 @@ <component> <name>DRPC_SERVER</name> <category>MASTER</category> + <autoStartOnFailure>true</autoStartOnFailure> <commandScript> <script>scripts/drpc_server.py</script> <scriptType>PYTHON</scriptType> http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/app-packages/storm/package/scripts/service.py ---------------------------------------------------------------------- diff --git a/app-packages/storm/package/scripts/service.py b/app-packages/storm/package/scripts/service.py index 10fa5b9..13fcef2 100644 --- a/app-packages/storm/package/scripts/service.py +++ b/app-packages/storm/package/scripts/service.py @@ -22,7 +22,9 @@ limitations under the License. from resource_management import * import time - +""" +Slider package uses jps as pgrep does not list the whole process start command +""" def service( name, action='start'): @@ -30,25 +32,25 @@ def service( import status_params pid_file = status_params.pid_files[name] + container_id = status_params.container_id no_op_test = format("ls {pid_file} >/dev/null 2>&1 && ps `cat {pid_file}` >/dev/null 2>&1") + jps_path = format("{java64_home}/bin/jps") - grep_and_awk = "| grep -v grep | awk '{print $1}'" + grep_and_awk = format("| grep {container_id}") + " | awk '{print $1}'" if name == 'ui': - #process_cmd = "^java.+backtype.storm.ui.core$" - pid_chk_cmd = format("{jps_path} -vl | grep \"^[0-9 ]*backtype.storm.ui.core\" {grep_and_awk} > {pid_file}") + crt_pid_cmd = format("{jps_path} -vl | grep \"^[0-9 ]*backtype.storm.ui.core\" {grep_and_awk} > {pid_file}") elif name == "rest_api": - process_cmd = format("{java64_home}/bin/java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server") - crt_pid_cmd = format("pgrep -f \"{process_cmd}\" && pgrep -f \"{process_cmd}\" > {pid_file}") + rest_process_cmd = format("{java64_home}/bin/java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server") + crt_pid_cmd = format("pgrep -f \"{rest_process_cmd}\" > {pid_file}") else: - #process_cmd = format("^java.+backtype.storm.daemon.{name}$") - pid_chk_cmd = format("{jps_path} -vl | grep \"^[0-9 ]*backtype.storm.daemon.{name}\" {grep_and_awk} > {pid_file}") + crt_pid_cmd = format("{jps_path} -vl | grep \"^[0-9 ]*backtype.storm.daemon.{name}\" {grep_and_awk} > {pid_file}") if action == "start": if name == "rest_api": - cmd = format("{process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log") + cmd = format("{rest_process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log") else: - cmd = format("env JAVA_HOME={java64_home} PATH=$PATH:{java64_home}/bin STORM_BASE_DIR={app_root} STORM_CONF_DIR={conf_dir} {storm_bin} {name}") + cmd = format("env JAVA_HOME={java64_home} PATH=$PATH:{java64_home}/bin STORM_BASE_DIR={app_root} STORM_CONF_DIR={conf_dir} {storm_bin} {name} > {log_dir}/{name}.out 2>&1") Execute(cmd, not_if=no_op_test, @@ -67,7 +69,7 @@ def service( else: content = None for i in xrange(12): - Execute(pid_chk_cmd, + Execute(crt_pid_cmd, user=params.storm_user, logoutput=True ) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/app-packages/storm/package/scripts/status_params.py ---------------------------------------------------------------------- diff --git a/app-packages/storm/package/scripts/status_params.py b/app-packages/storm/package/scripts/status_params.py index eab83cf..5907446 100644 --- a/app-packages/storm/package/scripts/status_params.py +++ b/app-packages/storm/package/scripts/status_params.py @@ -21,6 +21,7 @@ from resource_management import * config = Script.get_config() +container_id = config['configurations']['global']['app_container_id'] pid_dir = config['configurations']['global']['app_pid_dir'] pid_nimbus = format("{pid_dir}/nimbus.pid") pid_supervisor = format("{pid_dir}/supervisor.pid") http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index 92e9086..e097af0 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -231,7 +231,8 @@ class Controller(threading.Thread): return if serverId != self.responseId + 1: - logger.error("Error in responseId sequence - restarting") + logger.error("Error in responseId sequence expected " + str(self.responseId + 1) + + " but got " + str(serverId) + " - restarting") self.restartAgent() else: self.responseId = serverId http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py index 0f4acc6..0078c9c 100644 --- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py +++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py @@ -259,7 +259,7 @@ class CustomServiceOrchestrator(): elif allocated_for_any in value: ## All unallocated ports should be set to 0 logger.info("Assigning port 0 " + "for " + value) - value = "0" + value = self.set_all_unallocated_ports(value) command['configurations'][key][k] = value pass pass @@ -274,6 +274,35 @@ class CustomServiceOrchestrator(): pass """ + All unallocated ports should be set to 0 + Look for "${SOME_COMPONENT_NAME.ALLOCATED_PORT}" + or "${component.ALLOCATED_PORT}{DEFAULT_port}" + or "${component.ALLOCATED_PORT}{DEFAULT_port}{DO_NOT_PROPAGATE}" + """ + + def set_all_unallocated_ports(self, value): + pattern_start = "${" + sub_section_start = "}{" + pattern_end = "}" + index = value.find(pattern_start) + while index != -1: + replace_index_start = index + replace_index_end = value.find(pattern_end, replace_index_start) + next_pattern_start = value.find(sub_section_start, replace_index_start) + while next_pattern_start == replace_index_end: + replace_index_end = value.find(pattern_end, replace_index_end + 1) + next_pattern_start = value.find(sub_section_start, next_pattern_start + 1) + pass + + value = value[:replace_index_start] + "0" + value[replace_index_end + 1:] + + # look for the next + index = value.find(pattern_start) + + return value + pass + + """ Port allocation can asks for multiple dynamic ports port_req_pattern is of type ${component_name.ALLOCATED_PORT} append {DEFAULT_ and find the default value http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py index c07f05f..48e4df4 100644 --- a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py +++ b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py @@ -484,12 +484,20 @@ class TestCustomServiceOrchestrator(TestCase): command['configurations']['oozie-site'] = {} command['configurations']['oozie-site']['log_root'] = "${AGENT_LOG_ROOT}" command['configurations']['oozie-site']['a_port'] = "${HBASE_MASTER.ALLOCATED_PORT}" + command['configurations']['oozie-site']['ignore_port1'] = "[${HBASE_RS.ALLOCATED_PORT}]" + command['configurations']['oozie-site']['ignore_port2'] = "[${HBASE_RS.ALLOCATED_PORT},${HBASE_REST.ALLOCATED_PORT}{DO_NOT_PROPAGATE}]" + command['configurations']['oozie-site']['ignore_port3'] = "[${HBASE_RS.ALLOCATED_PORT}{a}{b}{c},${A.ALLOCATED_PORT}{DO_NOT_PROPAGATE},${A.ALLOCATED_PORT}{DEFAULT_3}{DO_NOT_PROPAGATE}]" + command['configurations']['oozie-site']['ignore_port4'] = "${HBASE_RS}{a}{b}{c}" allocated_ports = {} orchestrator.finalize_command(command, False, allocated_ports) self.assertEqual(command['configurations']['hbase-site']['work_root'], tempWorkDir) self.assertEqual(command['configurations']['oozie-site']['log_root'], tempdir) self.assertEqual(command['configurations']['oozie-site']['a_port'], "10023") + self.assertEqual(command['configurations']['oozie-site']['ignore_port1'], "[0]") + self.assertEqual(command['configurations']['oozie-site']['ignore_port2'], "[0,0]") + self.assertEqual(command['configurations']['oozie-site']['ignore_port3'], "[0,0,0]") + self.assertEqual(command['configurations']['oozie-site']['ignore_port4'], "${HBASE_RS}{a}{b}{c}") self.assertEqual(orchestrator.applied_configs, {}) self.assertEqual(len(allocated_ports), 1) self.assertTrue('oozie-site.a_port' in allocated_ports) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java b/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java index 048fefa..f728b40 100644 --- a/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java +++ b/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java @@ -58,13 +58,7 @@ public interface OptionKeys { String APPLICATION_TYPE = "application.type"; /** - * Time in milliseconds to wait after forking any in-AM - * process before attempting to start up the containers: {@value} - * - * A shorter value brings the cluster up faster, but means that if the - * in AM process fails (due to a bad configuration), then time - * is wasted starting containers on a cluster that isn't going to come - * up + * Name of the cluster/application instance */ String APPLICATION_NAME = "application.name"; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index 419a454..1fa35f3 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -127,6 +127,7 @@ public class AgentProviderService extends AbstractProviderService implements private HeartbeatMonitor monitor; private Boolean canAnyMasterPublish = null; private AgentLaunchParameter agentLaunchParameter = null; + private String clusterName = null; private final Map<String, ComponentInstanceState> componentStatuses = new ConcurrentHashMap<>(); private final Map<String, Map<String, String>> componentInstanceData = new ConcurrentHashMap<>(); @@ -552,6 +553,13 @@ public class AgentProviderService extends AbstractProviderService implements return this.heartbeatMonitorInterval; } + private String getClusterName() { + if (clusterName == null || clusterName.length() == 0) { + clusterName = getAmState().getInternalsSnapshot().get(OptionKeys.APPLICATION_NAME); + } + return clusterName; + } + /** * Publish a named property bag that may contain name-value pairs for app configurations such as hbase-site * @param name @@ -887,11 +895,10 @@ public class AgentProviderService extends AbstractProviderService implements throws SliderException { assert getAmState().isApplicationLive(); ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); - ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot(); ExecutionCommand cmd = new ExecutionCommand(AgentCommandType.EXECUTION_COMMAND); prepareExecutionCommand(cmd); - String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME); + String clusterName = getClusterName(); cmd.setClusterName(clusterName); cmd.setRoleCommand(Command.INSTALL.toString()); cmd.setServiceName(clusterName); @@ -964,10 +971,9 @@ public class AgentProviderService extends AbstractProviderService implements throws SliderException { assert getAmState().isApplicationLive(); ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); - ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot(); StatusCommand cmd = new StatusCommand(); - String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME); + String clusterName = getClusterName(); cmd.setCommandType(AgentCommandType.STATUS_COMMAND); cmd.setComponentName(roleName); @@ -993,10 +999,9 @@ public class AgentProviderService extends AbstractProviderService implements protected void addGetConfigCommand(String roleName, String containerId, HeartBeatResponse response) throws SliderException { assert getAmState().isApplicationLive(); - ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot(); StatusCommand cmd = new StatusCommand(); - String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME); + String clusterName = getClusterName(); cmd.setCommandType(AgentCommandType.STATUS_COMMAND); cmd.setComponentName(roleName); @@ -1116,7 +1121,7 @@ public class AgentProviderService extends AbstractProviderService implements Map<String, String> tokens, String containerId) { Map<String, String> config = new HashMap<>(); if (configName.equals(GLOBAL_CONFIG_TAG)) { - addDefaultGlobalConfig(config); + addDefaultGlobalConfig(config, containerId); } // add role hosts to tokens addRoleRelatedTokens(tokens); @@ -1161,10 +1166,11 @@ public class AgentProviderService extends AbstractProviderService implements return hosts; } - private void addDefaultGlobalConfig(Map<String, String> config) { - config.put("app_log_dir", "${AGENT_LOG_ROOT}/app/log"); + private void addDefaultGlobalConfig(Map<String, String> config, String containerId) { + config.put("app_log_dir", "${AGENT_LOG_ROOT}"); config.put("app_pid_dir", "${AGENT_WORK_ROOT}/app/run"); config.put("app_install_dir", "${AGENT_WORK_ROOT}/app/install"); + config.put("app_container_id", containerId); } private void buildRoleHostDetails(Map<String, String> details) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java index 9dba87c..57e7e3f 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java @@ -28,6 +28,7 @@ public class Component { String publishConfig; String minInstanceCount; String maxInstanceCount; + String autoStartOnFailure; CommandScript commandScript; List<Export> exports; @@ -60,6 +61,14 @@ public class Component { this.publishConfig = publishConfig; } + public String getAutoStartOnFailure() { + return autoStartOnFailure; + } + + public void setAutoStartOnFailure(String autoStartOnFailure) { + this.autoStartOnFailure = autoStartOnFailure; + } + public String getMinInstanceCount() { return minInstanceCount; } @@ -92,6 +101,10 @@ public class Component { return exports; } + public Boolean getRequiresAutoRestart() { + return Boolean.parseBoolean(this.autoStartOnFailure); + } + @Override public String toString() { final StringBuilder sb = http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java index deb7a6a..23134b6 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java @@ -39,6 +39,7 @@ public class MetainfoParser { digester.addBeanPropertySetter("*/application/comment"); digester.addBeanPropertySetter("*/application/version"); digester.addBeanPropertySetter("*/application/exportedConfigs"); + digester.addBeanPropertySetter("*/application/autoStartOnFailure"); digester.addObjectCreate("*/commandOrder", CommandOrder.class); digester.addBeanPropertySetter("*/commandOrder/command"); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b36c56ac/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java index d16603f..b5523d6 100644 --- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java +++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java @@ -136,6 +136,7 @@ public class TestAgentProviderService { + " <component>\n" + " <name>HBASE_MASTER</name>\n" + " <category>MASTER</category>\n" + + " <autoStartOnFailure>true</autoStartOnFailure>\n" + " <minInstanceCount>1</minInstanceCount>\n" + " <maxInstanceCount>2</maxInstanceCount>\n" + " <commandScript>\n" @@ -148,6 +149,7 @@ public class TestAgentProviderService { + " <name>HBASE_REGIONSERVER</name>\n" + " <category>SLAVE</category>\n" + " <minInstanceCount>1</minInstanceCount>\n" + + " <autoStartOnFailure>Falsee</autoStartOnFailure>\n" + " <commandScript>\n" + " <script>scripts/hbase_regionserver.py</script>\n" + " <scriptType>PYTHON</scriptType>\n" @@ -462,6 +464,8 @@ public class TestAgentProviderService { int found = 0; for (Component component : components) { if (component.getName().equals("HBASE_MASTER")) { + Assert.assertEquals(component.getAutoStartOnFailure(), "true"); + Assert.assertEquals(component.getRequiresAutoRestart(), Boolean.TRUE); Assert.assertEquals(component.getMinInstanceCount(), "1"); Assert.assertEquals(component.getMaxInstanceCount(), "2"); Assert.assertEquals(component.getCommandScript().getScript(), "scripts/hbase_master.py"); @@ -470,6 +474,8 @@ public class TestAgentProviderService { found++; } if (component.getName().equals("HBASE_REGIONSERVER")) { + Assert.assertEquals(component.getAutoStartOnFailure(), "Falsee"); + Assert.assertEquals(component.getRequiresAutoRestart(), Boolean.FALSE); Assert.assertEquals(component.getMinInstanceCount(), "1"); Assert.assertNull(component.getMaxInstanceCount()); Assert.assertEquals(component.getCommandScript().getScript(), "scripts/hbase_regionserver.py");