SLIDER-181. Slider support for Storm application package SLIDER-262. Slider agent should provide process supervision such as auto-restart SLIDER-123. Status check for Storm on YARN needs to be robust SLIDER-301. Add support for default status check through pid file
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/004fc8df Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/004fc8df Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/004fc8df Branch: refs/heads/feature/SLIDER-151_REST_API Commit: 004fc8dfdfde58c04297d4001e861a0f8e93d69e Parents: 966c8c8 Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Tue Aug 12 14:37:12 2014 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Thu Aug 14 17:54:57 2014 -0700 ---------------------------------------------------------------------- app-packages/memcached-win/appConfig.json | 2 +- .../memcached-win/package/scripts/memcached.py | 5 +++- .../memcached-win/package/scripts/params.py | 1 + app-packages/memcached/appConfig.json | 1 + .../memcached/package/scripts/memcached.py | 5 +++- .../memcached/package/scripts/params.py | 2 ++ app-packages/storm/appConfig.json | 6 ++-- app-packages/storm/metainfo.xml | 12 ++++++++ app-packages/storm/package/scripts/service.py | 24 ++++++++------- .../storm/package/scripts/status_params.py | 1 + .../src/main/python/agent/Controller.py | 3 +- .../python/agent/CustomServiceOrchestrator.py | 31 +++++++++++++++++++- .../core/providers/system.py | 3 +- .../core/providers/windows/system.py | 11 +++++-- .../core/resources/system.py | 4 +++ .../python/resource_management/core/shell.py | 14 +++++---- .../agent/TestCustomServiceOrchestrator.py | 8 +++++ .../java/org/apache/slider/api/OptionKeys.java | 1 - .../slider/common/tools/CoreFileSystem.java | 3 -- .../providers/agent/AgentClientProvider.java | 6 ---- .../providers/agent/AgentProviderService.java | 24 +++++++++------ .../agent/application/metadata/Component.java | 13 ++++++++ .../application/metadata/MetainfoParser.java | 1 + .../agent/TestAgentProviderService.java | 6 ++++ 24 files changed, 140 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/app-packages/memcached-win/appConfig.json ---------------------------------------------------------------------- diff --git a/app-packages/memcached-win/appConfig.json b/app-packages/memcached-win/appConfig.json index 63835ec..b76ecde 100644 --- a/app-packages/memcached-win/appConfig.json +++ b/app-packages/memcached-win/appConfig.json @@ -8,7 +8,7 @@ "site.global.app_user": "hadoop", "site.global.app_root": "${AGENT_WORK_ROOT}\\app\\install", - + "site.global.pid_file": "${AGENT_WORK_ROOT}\\app\\run\\component.pid", "site.global.additional_cp": "C:\\hdp\\hadoop-2.4.0.2.1.3.0-1990\\share\\hadoop\\common\\lib\\*", "site.global.xmx_val": "256m", "site.global.xms_val": "128m", http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/app-packages/memcached-win/package/scripts/memcached.py ---------------------------------------------------------------------- diff --git a/app-packages/memcached-win/package/scripts/memcached.py b/app-packages/memcached-win/package/scripts/memcached.py index 1bb05dc..bc9905d 100644 --- a/app-packages/memcached-win/package/scripts/memcached.py +++ b/app-packages/memcached-win/package/scripts/memcached.py @@ -39,7 +39,8 @@ class Memcached(Script): Execute(process_cmd, user=params.app_user, logoutput=False, - wait_for_finish=False + wait_for_finish=False, + pid_file=params.pid_file ) def stop(self, env): @@ -49,6 +50,8 @@ class Memcached(Script): def status(self, env): import params env.set_params(params) + #Check process status need to be changed for Windows + #check_process_status(params.pid_file) if __name__ == "__main__": Memcached().execute() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/app-packages/memcached-win/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/app-packages/memcached-win/package/scripts/params.py b/app-packages/memcached-win/package/scripts/params.py index f8e64da..fab3714 100644 --- a/app-packages/memcached-win/package/scripts/params.py +++ b/app-packages/memcached-win/package/scripts/params.py @@ -26,6 +26,7 @@ config = Script.get_config() app_root = config['configurations']['global']['app_root'] java64_home = config['hostLevelParams']['java_home'] app_user = config['configurations']['global']['app_user'] +pid_file = config['configurations']['global']['pid_file'] additional_cp = config['configurations']['global']['additional_cp'] xmx_val = config['configurations']['global']['xmx_val'] xms_val = config['configurations']['global']['xms_val'] http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/app-packages/memcached/appConfig.json ---------------------------------------------------------------------- diff --git a/app-packages/memcached/appConfig.json b/app-packages/memcached/appConfig.json index b179723..5f32030 100644 --- a/app-packages/memcached/appConfig.json +++ b/app-packages/memcached/appConfig.json @@ -8,6 +8,7 @@ "site.global.app_user": "yarn", "site.global.app_root": "${AGENT_WORK_ROOT}/app/install/jmemcached-1.0.0", + "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/component.pid", "site.global.additional_cp": "/usr/lib/hadoop/lib/*", "site.global.xmx_val": "256m", http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/app-packages/memcached/package/scripts/memcached.py ---------------------------------------------------------------------- diff --git a/app-packages/memcached/package/scripts/memcached.py b/app-packages/memcached/package/scripts/memcached.py index cf04cf1..6e14e86 100644 --- a/app-packages/memcached/package/scripts/memcached.py +++ b/app-packages/memcached/package/scripts/memcached.py @@ -24,6 +24,7 @@ from resource_management import * class Memcached(Script): def install(self, env): self.install_packages(env) + pass def configure(self, env): import params @@ -38,7 +39,8 @@ class Memcached(Script): Execute(process_cmd, user=params.app_user, logoutput=False, - wait_for_finish=False + wait_for_finish=False, + pid_file=params.pid_file ) def stop(self, env): @@ -48,6 +50,7 @@ class Memcached(Script): def status(self, env): import params env.set_params(params) + check_process_status(params.pid_file) if __name__ == "__main__": Memcached().execute() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/app-packages/memcached/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/app-packages/memcached/package/scripts/params.py b/app-packages/memcached/package/scripts/params.py index f8e64da..25b4055 100644 --- a/app-packages/memcached/package/scripts/params.py +++ b/app-packages/memcached/package/scripts/params.py @@ -26,6 +26,8 @@ config = Script.get_config() app_root = config['configurations']['global']['app_root'] java64_home = config['hostLevelParams']['java_home'] app_user = config['configurations']['global']['app_user'] +pid_file = config['configurations']['global']['pid_file'] + additional_cp = config['configurations']['global']['additional_cp'] xmx_val = config['configurations']['global']['xmx_val'] xms_val = config['configurations']['global']['xms_val'] http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/app-packages/storm/appConfig.json ---------------------------------------------------------------------- diff --git a/app-packages/storm/appConfig.json b/app-packages/storm/appConfig.json index 6d6aa3a..24078cf 100644 --- a/app-packages/storm/appConfig.json +++ b/app-packages/storm/appConfig.json @@ -3,7 +3,7 @@ "metadata": { }, "global": { - "application.def": "/slider/storm_v091.zip", + "application.def": "package/storm_v091.zip", "config_types": "storm-site", "java_home": "/usr/jdk64/jdk1.7.0_45", "package_list": "files/apache-storm-0.9.1.2.1.1.0-237.tar.gz", @@ -68,7 +68,7 @@ "site.storm-site.logviewer.appender.name": "A1", "site.storm-site.nimbus.host": "${NIMBUS_HOST}", "site.storm-site.ui.port": "${STORM_UI_SERVER.ALLOCATED_PORT}", - "site.storm-site.supervisor.slots.ports": "[${SUPERVISOR.ALLOCATED_PORT}]", + "site.storm-site.supervisor.slots.ports": "[${SUPERVISOR.ALLOCATED_PORT}{DO_NOT_PROPAGATE},${SUPERVISOR.ALLOCATED_PORT}{DO_NOT_PROPAGATE}]", "site.storm-site.nimbus.file.copy.expiration.secs": "600", "site.storm-site.supervisor.monitor.frequency.secs": "3", "site.storm-site.transactional.zookeeper.servers": "null", @@ -105,7 +105,7 @@ "site.storm-site.topology.trident.batch.emit.interval.millis": "500", "site.storm-site.topology.builtin.metrics.bucket.size.secs": "60", "site.storm-site.storm.thrift.transport": "backtype.storm.security.auth.SimpleTransportPlugin", - "site.storm-site.logviewer.port": "0", + "site.storm-site.logviewer.port": "${SUPERVISOR.ALLOCATED_PORT}{DO_NOT_PROPAGATE}", "site.storm-site.topology.debug": "false" }, "components": { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/app-packages/storm/metainfo.xml ---------------------------------------------------------------------- diff --git a/app-packages/storm/metainfo.xml b/app-packages/storm/metainfo.xml index 7edd794..e94bc62 100644 --- a/app-packages/storm/metainfo.xml +++ b/app-packages/storm/metainfo.xml @@ -22,6 +22,7 @@ <name>STORM</name> <comment>Apache Hadoop Stream processing framework</comment> <version>0.9.1.2.1</version> + <exportedConfigs>storm-site</exportedConfigs> <exportGroups> <exportGroup> @@ -76,6 +77,7 @@ <component> <name>NIMBUS</name> <category>MASTER</category> + <autoStartOnFailure>true</autoStartOnFailure> <commandScript> <script>scripts/nimbus.py</script> <scriptType>PYTHON</scriptType> @@ -86,6 +88,7 @@ <component> <name>STORM_REST_API</name> <category>MASTER</category> + <autoStartOnFailure>true</autoStartOnFailure> <commandScript> <script>scripts/rest_api.py</script> <scriptType>PYTHON</scriptType> @@ -96,6 +99,13 @@ <component> <name>SUPERVISOR</name> <category>SLAVE</category> + <autoStartOnFailure>true</autoStartOnFailure> + <exports> + <export> + <name>log_viewer_port</name> + <value>${THIS_HOST}:${site.storm-site.logviewer.port}</value> + </export> + </exports> <commandScript> <script>scripts/supervisor.py</script> <scriptType>PYTHON</scriptType> @@ -107,6 +117,7 @@ <name>STORM_UI_SERVER</name> <category>MASTER</category> <publishConfig>true</publishConfig> + <autoStartOnFailure>true</autoStartOnFailure> <commandScript> <script>scripts/ui_server.py</script> <scriptType>PYTHON</scriptType> @@ -117,6 +128,7 @@ <component> <name>DRPC_SERVER</name> <category>MASTER</category> + <autoStartOnFailure>true</autoStartOnFailure> <commandScript> <script>scripts/drpc_server.py</script> <scriptType>PYTHON</scriptType> http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/app-packages/storm/package/scripts/service.py ---------------------------------------------------------------------- diff --git a/app-packages/storm/package/scripts/service.py b/app-packages/storm/package/scripts/service.py index 10fa5b9..13fcef2 100644 --- a/app-packages/storm/package/scripts/service.py +++ b/app-packages/storm/package/scripts/service.py @@ -22,7 +22,9 @@ limitations under the License. from resource_management import * import time - +""" +Slider package uses jps as pgrep does not list the whole process start command +""" def service( name, action='start'): @@ -30,25 +32,25 @@ def service( import status_params pid_file = status_params.pid_files[name] + container_id = status_params.container_id no_op_test = format("ls {pid_file} >/dev/null 2>&1 && ps `cat {pid_file}` >/dev/null 2>&1") + jps_path = format("{java64_home}/bin/jps") - grep_and_awk = "| grep -v grep | awk '{print $1}'" + grep_and_awk = format("| grep {container_id}") + " | awk '{print $1}'" if name == 'ui': - #process_cmd = "^java.+backtype.storm.ui.core$" - pid_chk_cmd = format("{jps_path} -vl | grep \"^[0-9 ]*backtype.storm.ui.core\" {grep_and_awk} > {pid_file}") + crt_pid_cmd = format("{jps_path} -vl | grep \"^[0-9 ]*backtype.storm.ui.core\" {grep_and_awk} > {pid_file}") elif name == "rest_api": - process_cmd = format("{java64_home}/bin/java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server") - crt_pid_cmd = format("pgrep -f \"{process_cmd}\" && pgrep -f \"{process_cmd}\" > {pid_file}") + rest_process_cmd = format("{java64_home}/bin/java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server") + crt_pid_cmd = format("pgrep -f \"{rest_process_cmd}\" > {pid_file}") else: - #process_cmd = format("^java.+backtype.storm.daemon.{name}$") - pid_chk_cmd = format("{jps_path} -vl | grep \"^[0-9 ]*backtype.storm.daemon.{name}\" {grep_and_awk} > {pid_file}") + crt_pid_cmd = format("{jps_path} -vl | grep \"^[0-9 ]*backtype.storm.daemon.{name}\" {grep_and_awk} > {pid_file}") if action == "start": if name == "rest_api": - cmd = format("{process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log") + cmd = format("{rest_process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log") else: - cmd = format("env JAVA_HOME={java64_home} PATH=$PATH:{java64_home}/bin STORM_BASE_DIR={app_root} STORM_CONF_DIR={conf_dir} {storm_bin} {name}") + cmd = format("env JAVA_HOME={java64_home} PATH=$PATH:{java64_home}/bin STORM_BASE_DIR={app_root} STORM_CONF_DIR={conf_dir} {storm_bin} {name} > {log_dir}/{name}.out 2>&1") Execute(cmd, not_if=no_op_test, @@ -67,7 +69,7 @@ def service( else: content = None for i in xrange(12): - Execute(pid_chk_cmd, + Execute(crt_pid_cmd, user=params.storm_user, logoutput=True ) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/app-packages/storm/package/scripts/status_params.py ---------------------------------------------------------------------- diff --git a/app-packages/storm/package/scripts/status_params.py b/app-packages/storm/package/scripts/status_params.py index eab83cf..5907446 100644 --- a/app-packages/storm/package/scripts/status_params.py +++ b/app-packages/storm/package/scripts/status_params.py @@ -21,6 +21,7 @@ from resource_management import * config = Script.get_config() +container_id = config['configurations']['global']['app_container_id'] pid_dir = config['configurations']['global']['app_pid_dir'] pid_nimbus = format("{pid_dir}/nimbus.pid") pid_supervisor = format("{pid_dir}/supervisor.pid") http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index 4a103cb..1a01bf8 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -237,7 +237,8 @@ class Controller(threading.Thread): return if serverId != self.responseId + 1: - logger.error("Error in responseId sequence - restarting") + logger.error("Error in responseId sequence expected " + str(self.responseId + 1) + + " but got " + str(serverId) + " - restarting") self.restartAgent() else: self.responseId = serverId http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py index 0f4acc6..0078c9c 100644 --- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py +++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py @@ -259,7 +259,7 @@ class CustomServiceOrchestrator(): elif allocated_for_any in value: ## All unallocated ports should be set to 0 logger.info("Assigning port 0 " + "for " + value) - value = "0" + value = self.set_all_unallocated_ports(value) command['configurations'][key][k] = value pass pass @@ -274,6 +274,35 @@ class CustomServiceOrchestrator(): pass """ + All unallocated ports should be set to 0 + Look for "${SOME_COMPONENT_NAME.ALLOCATED_PORT}" + or "${component.ALLOCATED_PORT}{DEFAULT_port}" + or "${component.ALLOCATED_PORT}{DEFAULT_port}{DO_NOT_PROPAGATE}" + """ + + def set_all_unallocated_ports(self, value): + pattern_start = "${" + sub_section_start = "}{" + pattern_end = "}" + index = value.find(pattern_start) + while index != -1: + replace_index_start = index + replace_index_end = value.find(pattern_end, replace_index_start) + next_pattern_start = value.find(sub_section_start, replace_index_start) + while next_pattern_start == replace_index_end: + replace_index_end = value.find(pattern_end, replace_index_end + 1) + next_pattern_start = value.find(sub_section_start, next_pattern_start + 1) + pass + + value = value[:replace_index_start] + "0" + value[replace_index_end + 1:] + + # look for the next + index = value.find(pattern_start) + + return value + pass + + """ Port allocation can asks for multiple dynamic ports port_req_pattern is of type ${component_name.ALLOCATED_PORT} append {DEFAULT_ and find the default value http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-agent/src/main/python/resource_management/core/providers/system.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/resource_management/core/providers/system.py b/slider-agent/src/main/python/resource_management/core/providers/system.py index e273498..6969c62 100644 --- a/slider-agent/src/main/python/resource_management/core/providers/system.py +++ b/slider-agent/src/main/python/resource_management/core/providers/system.py @@ -242,7 +242,8 @@ class ExecuteProvider(Provider): shell.checked_call(self.resource.command, logoutput=self.resource.logoutput, cwd=self.resource.cwd, env=self.resource.environment, preexec_fn=_preexec_fn(self.resource), user=self.resource.user, - wait_for_finish=self.resource.wait_for_finish, timeout=self.resource.timeout) + wait_for_finish=self.resource.wait_for_finish, timeout=self.resource.timeout, + pid_file=self.resource.pid_file) break except Fail as ex: if i == self.resource.tries-1: # last try http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-agent/src/main/python/resource_management/core/providers/windows/system.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/resource_management/core/providers/windows/system.py b/slider-agent/src/main/python/resource_management/core/providers/windows/system.py index fa29d7d..f0d4825 100644 --- a/slider-agent/src/main/python/resource_management/core/providers/windows/system.py +++ b/slider-agent/src/main/python/resource_management/core/providers/windows/system.py @@ -30,12 +30,16 @@ import subprocess import shutil -def _call_command(command, logoutput=False, cwd=None, env=None, wait_for_finish=True, timeout=None): - # TODO implement timeout, logoutput, wait_for_finish +def _call_command(command, logoutput=False, cwd=None, env=None, wait_for_finish=True, timeout=None, pid_file_name=None): + # TODO implement logoutput Logger.info("Executing %s" % (command)) proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd, env=env, shell=False) if not wait_for_finish: + if pid_file_name: + pidfile = open(pid_file_name, 'w') + pidfile.write(str(proc.pid)) + pidfile.close() return None, None if timeout: @@ -135,7 +139,8 @@ class ExecuteProvider(Provider): try: _call_command(self.resource.command, logoutput=self.resource.logoutput, cwd=self.resource.cwd, env=self.resource.environment, - wait_for_finish=self.resource.wait_for_finish, timeout=self.resource.timeout) + wait_for_finish=self.resource.wait_for_finish, timeout=self.resource.timeout, + pid_file_name=self.resource.pid_file) break except Fail as ex: if i == self.resource.tries - 1: # last try http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-agent/src/main/python/resource_management/core/resources/system.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/resource_management/core/resources/system.py b/slider-agent/src/main/python/resource_management/core/resources/system.py index 2c832a4..a63d993 100644 --- a/slider-agent/src/main/python/resource_management/core/resources/system.py +++ b/slider-agent/src/main/python/resource_management/core/resources/system.py @@ -101,6 +101,10 @@ class Execute(Resource): - try_sleep """ wait_for_finish = BooleanArgument(default=True) + """ + if wait_for_finish is True then optionally the caller can ask for the pid to be written + """ + pid_file = ResourceArgument() class ExecuteScript(Resource): http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-agent/src/main/python/resource_management/core/shell.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/resource_management/core/shell.py b/slider-agent/src/main/python/resource_management/core/shell.py index 92312d5..fb2c946 100644 --- a/slider-agent/src/main/python/resource_management/core/shell.py +++ b/slider-agent/src/main/python/resource_management/core/shell.py @@ -31,15 +31,15 @@ from exceptions import ExecuteTimeoutException from resource_management.core.logger import Logger def checked_call(command, logoutput=False, - cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None): - return _call(command, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout) + cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, pid_file=None): + return _call(command, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, pid_file) def call(command, logoutput=False, - cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None): - return _call(command, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout) + cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, pid_file=None): + return _call(command, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout, pid_file) def _call(command, logoutput=False, throw_on_failure=True, - cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None): + cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, pid_file_name=None): """ Execute shell command @@ -67,6 +67,10 @@ def _call(command, logoutput=False, throw_on_failure=True, preexec_fn=preexec_fn) if not wait_for_finish: + if pid_file_name: + pidfile = open(pid_file_name, 'w') + pidfile.write(str(proc.pid)) + pidfile.close() return None, None if timeout: http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py index c07f05f..48e4df4 100644 --- a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py +++ b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py @@ -484,12 +484,20 @@ class TestCustomServiceOrchestrator(TestCase): command['configurations']['oozie-site'] = {} command['configurations']['oozie-site']['log_root'] = "${AGENT_LOG_ROOT}" command['configurations']['oozie-site']['a_port'] = "${HBASE_MASTER.ALLOCATED_PORT}" + command['configurations']['oozie-site']['ignore_port1'] = "[${HBASE_RS.ALLOCATED_PORT}]" + command['configurations']['oozie-site']['ignore_port2'] = "[${HBASE_RS.ALLOCATED_PORT},${HBASE_REST.ALLOCATED_PORT}{DO_NOT_PROPAGATE}]" + command['configurations']['oozie-site']['ignore_port3'] = "[${HBASE_RS.ALLOCATED_PORT}{a}{b}{c},${A.ALLOCATED_PORT}{DO_NOT_PROPAGATE},${A.ALLOCATED_PORT}{DEFAULT_3}{DO_NOT_PROPAGATE}]" + command['configurations']['oozie-site']['ignore_port4'] = "${HBASE_RS}{a}{b}{c}" allocated_ports = {} orchestrator.finalize_command(command, False, allocated_ports) self.assertEqual(command['configurations']['hbase-site']['work_root'], tempWorkDir) self.assertEqual(command['configurations']['oozie-site']['log_root'], tempdir) self.assertEqual(command['configurations']['oozie-site']['a_port'], "10023") + self.assertEqual(command['configurations']['oozie-site']['ignore_port1'], "[0]") + self.assertEqual(command['configurations']['oozie-site']['ignore_port2'], "[0,0]") + self.assertEqual(command['configurations']['oozie-site']['ignore_port3'], "[0,0,0]") + self.assertEqual(command['configurations']['oozie-site']['ignore_port4'], "${HBASE_RS}{a}{b}{c}") self.assertEqual(orchestrator.applied_configs, {}) self.assertEqual(len(allocated_ports), 1) self.assertTrue('oozie-site.a_port' in allocated_ports) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java b/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java index 320cc4f..a035a99 100644 --- a/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java +++ b/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java @@ -35,7 +35,6 @@ public interface OptionKeys extends InternalKeys { */ String APPLICATION_TYPE = "application.type"; - String APPLICATION_NAME = "application.name"; /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java b/slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java index 2381a82..a145b8f 100644 --- a/slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java +++ b/slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java @@ -48,11 +48,8 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.charset.Charset; -import java.util.Enumeration; import java.util.HashMap; import java.util.Map; -import java.util.zip.ZipEntry; -import java.util.zip.ZipFile; import static org.apache.slider.common.SliderXmlConfKeys.CLUSTER_DIRECTORY_PERMISSIONS; import static org.apache.slider.common.SliderXmlConfKeys.DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java index 8f92ffa..3a1ee76 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java @@ -92,12 +92,6 @@ public class AgentClientProvider extends AbstractClientProvider getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF); Path appDefPath = new Path(appDef); sliderFileSystem.verifyFileExists(appDefPath); - - String pkgList = instanceDefinition.getAppConfOperations(). - getGlobalOptions().getOption(AgentKeys.PACKAGE_LIST, null); - if (pkgList != null) { - sliderFileSystem.verifyFileExistsInZip(appDefPath, pkgList); - } String agentConf = instanceDefinition.getAppConfOperations(). getGlobalOptions().getOption(AgentKeys.AGENT_CONF, ""); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index 3225066..559c4e7 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -131,6 +131,7 @@ public class AgentProviderService extends AbstractProviderService implements private HeartbeatMonitor monitor; private Boolean canAnyMasterPublish = null; private AgentLaunchParameter agentLaunchParameter = null; + private String clusterName = null; private final Map<String, ComponentInstanceState> componentStatuses = new ConcurrentHashMap<String, ComponentInstanceState>(); @@ -629,6 +630,13 @@ public class AgentProviderService extends AbstractProviderService implements return this.heartbeatMonitorInterval; } + private String getClusterName() { + if (clusterName == null || clusterName.length() == 0) { + clusterName = getAmState().getInternalsSnapshot().get(OptionKeys.APPLICATION_NAME); + } + return clusterName; + } + /** * Publish a named property bag that may contain name-value pairs for app configurations such as hbase-site * @param name @@ -973,11 +981,10 @@ public class AgentProviderService extends AbstractProviderService implements throws SliderException { assert getAmState().isApplicationLive(); ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); - ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot(); ExecutionCommand cmd = new ExecutionCommand(AgentCommandType.EXECUTION_COMMAND); prepareExecutionCommand(cmd); - String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME); + String clusterName = getClusterName(); cmd.setClusterName(clusterName); cmd.setRoleCommand(Command.INSTALL.toString()); cmd.setServiceName(clusterName); @@ -1050,10 +1057,9 @@ public class AgentProviderService extends AbstractProviderService implements throws SliderException { assert getAmState().isApplicationLive(); ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); - ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot(); StatusCommand cmd = new StatusCommand(); - String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME); + String clusterName = getClusterName(); cmd.setCommandType(AgentCommandType.STATUS_COMMAND); cmd.setComponentName(roleName); @@ -1079,10 +1085,9 @@ public class AgentProviderService extends AbstractProviderService implements protected void addGetConfigCommand(String roleName, String containerId, HeartBeatResponse response) throws SliderException { assert getAmState().isApplicationLive(); - ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot(); StatusCommand cmd = new StatusCommand(); - String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME); + String clusterName = getClusterName(); cmd.setCommandType(AgentCommandType.STATUS_COMMAND); cmd.setComponentName(roleName); @@ -1203,7 +1208,7 @@ public class AgentProviderService extends AbstractProviderService implements Map<String, String> tokens, String containerId) { Map<String, String> config = new HashMap<String, String>(); if (configName.equals(GLOBAL_CONFIG_TAG)) { - addDefaultGlobalConfig(config); + addDefaultGlobalConfig(config, containerId); } // add role hosts to tokens addRoleRelatedTokens(tokens); @@ -1248,10 +1253,11 @@ public class AgentProviderService extends AbstractProviderService implements return hosts; } - private void addDefaultGlobalConfig(Map<String, String> config) { - config.put("app_log_dir", "${AGENT_LOG_ROOT}/app/log"); + private void addDefaultGlobalConfig(Map<String, String> config, String containerId) { + config.put("app_log_dir", "${AGENT_LOG_ROOT}"); config.put("app_pid_dir", "${AGENT_WORK_ROOT}/app/run"); config.put("app_install_dir", "${AGENT_WORK_ROOT}/app/install"); + config.put("app_container_id", containerId); } private void buildRoleHostDetails(Map<String, String> details) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java index f7b77ff..acf6ecb 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java @@ -28,6 +28,7 @@ public class Component { String publishConfig; String minInstanceCount; String maxInstanceCount; + String autoStartOnFailure; CommandScript commandScript; List<Export> exports; @@ -60,6 +61,14 @@ public class Component { this.publishConfig = publishConfig; } + public String getAutoStartOnFailure() { + return autoStartOnFailure; + } + + public void setAutoStartOnFailure(String autoStartOnFailure) { + this.autoStartOnFailure = autoStartOnFailure; + } + public String getMinInstanceCount() { return minInstanceCount; } @@ -92,6 +101,10 @@ public class Component { return exports; } + public Boolean getRequiresAutoRestart() { + return Boolean.parseBoolean(this.autoStartOnFailure); + } + @Override public String toString() { final StringBuilder sb = http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java index deb7a6a..23134b6 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java @@ -39,6 +39,7 @@ public class MetainfoParser { digester.addBeanPropertySetter("*/application/comment"); digester.addBeanPropertySetter("*/application/version"); digester.addBeanPropertySetter("*/application/exportedConfigs"); + digester.addBeanPropertySetter("*/application/autoStartOnFailure"); digester.addObjectCreate("*/commandOrder", CommandOrder.class); digester.addBeanPropertySetter("*/commandOrder/command"); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/004fc8df/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java index ee61129..a68e770 100644 --- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java +++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java @@ -134,6 +134,7 @@ public class TestAgentProviderService { + " <component>\n" + " <name>HBASE_MASTER</name>\n" + " <category>MASTER</category>\n" + + " <autoStartOnFailure>true</autoStartOnFailure>\n" + " <minInstanceCount>1</minInstanceCount>\n" + " <maxInstanceCount>2</maxInstanceCount>\n" + " <commandScript>\n" @@ -146,6 +147,7 @@ public class TestAgentProviderService { + " <name>HBASE_REGIONSERVER</name>\n" + " <category>SLAVE</category>\n" + " <minInstanceCount>1</minInstanceCount>\n" + + " <autoStartOnFailure>Falsee</autoStartOnFailure>\n" + " <commandScript>\n" + " <script>scripts/hbase_regionserver.py</script>\n" + " <scriptType>PYTHON</scriptType>\n" @@ -469,6 +471,8 @@ public class TestAgentProviderService { int found = 0; for (Component component : components) { if (component.getName().equals("HBASE_MASTER")) { + Assert.assertEquals(component.getAutoStartOnFailure(), "true"); + Assert.assertEquals(component.getRequiresAutoRestart(), Boolean.TRUE); Assert.assertEquals(component.getMinInstanceCount(), "1"); Assert.assertEquals(component.getMaxInstanceCount(), "2"); Assert.assertEquals(component.getCommandScript().getScript(), "scripts/hbase_master.py"); @@ -477,6 +481,8 @@ public class TestAgentProviderService { found++; } if (component.getName().equals("HBASE_REGIONSERVER")) { + Assert.assertEquals(component.getAutoStartOnFailure(), "Falsee"); + Assert.assertEquals(component.getRequiresAutoRestart(), Boolean.FALSE); Assert.assertEquals(component.getMinInstanceCount(), "1"); Assert.assertNull(component.getMaxInstanceCount()); Assert.assertEquals(component.getCommandScript().getScript(), "scripts/hbase_regionserver.py");