Repository: incubator-slider Updated Branches: refs/heads/releases/slider-0.60 b0a773cf3 -> 9641bd0c6
SLIDER-620. Execute command should allow polling for daemons that may stop within seconds Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/9641bd0c Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/9641bd0c Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/9641bd0c Branch: refs/heads/releases/slider-0.60 Commit: 9641bd0c6860d6e06b84adf6123c021249b2ed82 Parents: b0a773c Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Thu Nov 6 18:29:41 2014 -0800 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Thu Nov 6 18:31:55 2014 -0800 ---------------------------------------------------------------------- .../hbase-win/package/scripts/params.py | 16 +++++----- app-packages/memcached-win/metainfo.xml | 18 +++++++---- .../memcached-win/package/scripts/memcached.py | 6 ++-- .../memcached-win/package/scripts/params.py | 2 +- app-packages/memcached/README.txt | 11 ++++++- .../memcached/package/scripts/memcached.py | 3 +- .../python/resource_management/core/logger.py | 20 ++++++++++-- .../core/providers/system.py | 2 +- .../core/providers/windows/system.py | 33 +++++++++++++++----- .../core/resources/system.py | 1 + .../python/resource_management/core/shell.py | 29 ++++++++++++----- .../libraries/functions/check_process_status.py | 2 +- .../resource_management/TestExecuteResource.py | 25 +++++++++++++++ 13 files changed, 128 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/app-packages/hbase-win/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/app-packages/hbase-win/package/scripts/params.py b/app-packages/hbase-win/package/scripts/params.py index 90d274c..5a54e25 100644 --- a/app-packages/hbase-win/package/scripts/params.py +++ b/app-packages/hbase-win/package/scripts/params.py @@ -44,11 +44,11 @@ hbase_hdfs_root_dir = config['configurations']['hbase-site']['hbase.rootdir'] """ Read various ports """ -rest_port = default("configurations/global/hbase_rest_port", 1700) -thrift_port = default("configurations/global/hbase_thrift_port", 9090) -thrift2_port = default("configurations/global/hbase_thrift2_port", 9091) -thrift_info_port = default("configurations/global/hbase_info_thrift_port", 9095) -thrift2_info_port = default("configurations/global/hbase_info_thrift2_port", 9096) +rest_port = default("/configurations/global/hbase_rest_port", 1700) +thrift_port = default("/configurations/global/hbase_thrift_port", 9090) +thrift2_port = default("/configurations/global/hbase_thrift2_port", 9091) +thrift_info_port = default("/configurations/global/hbase_info_thrift_port", 9095) +thrift2_info_port = default("/configurations/global/hbase_info_thrift2_port", 9096) """ Compute or read various heap sizes @@ -59,9 +59,9 @@ regionserver_xmn_max = config['configurations']['hbase-env']['hbase_regionserver regionserver_xmn_percent = config['configurations']['hbase-env']['hbase_regionserver_xmn_ratio'] regionserver_xmn_size = calc_xmn_from_xms(regionserver_heapsize, regionserver_xmn_percent, regionserver_xmn_max) -restserver_heapsize = default("configurations/hbase-env/hbase_restserver_heapsize", "512m") -thriftserver_heapsize = default("configurations/hbase-env/hbase_thriftserver_heapsize", "512m") -thrift2server_heapsize = default("configurations/hbase-env/hbase_thrift2server_heapsize", "512m") +restserver_heapsize = default("/configurations/hbase-env/hbase_restserver_heapsize", "512m") +thriftserver_heapsize = default("/configurations/hbase-env/hbase_thriftserver_heapsize", "512m") +thrift2server_heapsize = default("/configurations/hbase-env/hbase_thrift2server_heapsize", "512m") hbase_env_sh_template = config['configurations']['hbase-env']['content'] java_library_path = config['configurations']['global']['java_library_path'] http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/app-packages/memcached-win/metainfo.xml ---------------------------------------------------------------------- diff --git a/app-packages/memcached-win/metainfo.xml b/app-packages/memcached-win/metainfo.xml index c7e5881..093001b 100644 --- a/app-packages/memcached-win/metainfo.xml +++ b/app-packages/memcached-win/metainfo.xml @@ -23,17 +23,23 @@ <comment>Memcache is a network accessible key/value storage system, often used as a distributed cache.</comment> <version>1.0.0</version> <exportedConfigs>None</exportedConfigs> + <exportGroups> + <exportGroup> + <name>Servers</name> + <exports> + <export> + <name>host_port</name> + <value>${MEMCACHED_HOST}:${site.global.listen_port}</value> + </export> + </exports> + </exportGroup> + </exportGroups> <components> <component> <name>MEMCACHED</name> <category>MASTER</category> - <componentExports> - <componentExport> - <name>host_port</name> - <value>${THIS_HOST}:${site.global.listen_port}</value> - </componentExport> - </componentExports> + <compExports>Servers-host_port</compExports> <commandScript> <script>scripts/memcached.py</script> <scriptType>PYTHON</scriptType> http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/app-packages/memcached-win/package/scripts/memcached.py ---------------------------------------------------------------------- diff --git a/app-packages/memcached-win/package/scripts/memcached.py b/app-packages/memcached-win/package/scripts/memcached.py index cd560dd..c272e47 100644 --- a/app-packages/memcached-win/package/scripts/memcached.py +++ b/app-packages/memcached-win/package/scripts/memcached.py @@ -39,7 +39,8 @@ class Memcached(Script): Execute(process_cmd, logoutput=False, wait_for_finish=False, - pid_file=params.pid_file + pid_file=params.pid_file, + poll_after = 5 ) def stop(self, env): @@ -49,8 +50,7 @@ class Memcached(Script): def status(self, env): import params env.set_params(params) - #Check process status need to be changed for Windows - #check_process_status(params.pid_file) + check_process_status(params.pid_file) if __name__ == "__main__": Memcached().execute() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/app-packages/memcached-win/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/app-packages/memcached-win/package/scripts/params.py b/app-packages/memcached-win/package/scripts/params.py index fab3714..056a3b9 100644 --- a/app-packages/memcached-win/package/scripts/params.py +++ b/app-packages/memcached-win/package/scripts/params.py @@ -25,8 +25,8 @@ config = Script.get_config() app_root = config['configurations']['global']['app_root'] java64_home = config['hostLevelParams']['java_home'] -app_user = config['configurations']['global']['app_user'] pid_file = config['configurations']['global']['pid_file'] + additional_cp = config['configurations']['global']['additional_cp'] xmx_val = config['configurations']['global']['xmx_val'] xms_val = config['configurations']['global']['xms_val'] http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/app-packages/memcached/README.txt ---------------------------------------------------------------------- diff --git a/app-packages/memcached/README.txt b/app-packages/memcached/README.txt index d9d8810..fc0e4f3 100644 --- a/app-packages/memcached/README.txt +++ b/app-packages/memcached/README.txt @@ -19,7 +19,16 @@ How to create a Slider app package for Memcached? To create the app package you will need the Memcached tarball copied to a specific location. -Replace the placeholder tarball for JMemcached. +Replace the placeholder tarball for JMemcached. The tarball must have all the jar files at the +root directory. +Example: + tar -tvf jmemcached-1.0.0.tar + -rw-r--r-- ./jmemcached-cli-1.0.0.jar + -rwxr-xr-x ./jmemcached-core-1.0.0.jar + +If not modify, appConfig.json to have correct application install root. + "site.global.app_root": "${AGENT_WORK_ROOT}/app/install/my_sub_root_for_jars", + cp ~/Downloads/jmemcached-1.0.0.tar package/files/ rm package/files/jmemcached-1.0.0.tar.REPLACE http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/app-packages/memcached/package/scripts/memcached.py ---------------------------------------------------------------------- diff --git a/app-packages/memcached/package/scripts/memcached.py b/app-packages/memcached/package/scripts/memcached.py index 897a993..986b61e 100644 --- a/app-packages/memcached/package/scripts/memcached.py +++ b/app-packages/memcached/package/scripts/memcached.py @@ -39,7 +39,8 @@ class Memcached(Script): Execute(process_cmd, logoutput=False, wait_for_finish=False, - pid_file=params.pid_file + pid_file=params.pid_file, + poll_after = 5 ) def stop(self, env): http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/slider-agent/src/main/python/resource_management/core/logger.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/resource_management/core/logger.py b/slider-agent/src/main/python/resource_management/core/logger.py index b80042a..5d6e414 100644 --- a/slider-agent/src/main/python/resource_management/core/logger.py +++ b/slider-agent/src/main/python/resource_management/core/logger.py @@ -29,7 +29,15 @@ class Logger: # unprotected_strings : protected_strings map sensitive_strings = {} - + + @staticmethod + def error(text): + Logger.logger.error(Logger.get_protected_text(text)) + + @staticmethod + def warning(text): + Logger.logger.warning(Logger.get_protected_text(text)) + @staticmethod def info(text): Logger.logger.info(Logger.get_protected_text(text)) @@ -39,6 +47,14 @@ class Logger: Logger.logger.debug(Logger.get_protected_text(text)) @staticmethod + def error_resource(resource): + Logger.error(Logger.get_protected_text(Logger._get_resource_repr(resource))) + + @staticmethod + def warning_resource(resource): + Logger.warning(Logger.get_protected_text(Logger._get_resource_repr(resource))) + + @staticmethod def info_resource(resource): Logger.info(Logger.get_protected_text(Logger._get_resource_repr(resource))) @@ -92,4 +108,4 @@ class Logger: if arguments_str: arguments_str = arguments_str[:-2] - return "{0} {{{1}}}".format(resource, arguments_str) \ No newline at end of file + return unicode("{0} {{{1}}}").format(resource, arguments_str) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/slider-agent/src/main/python/resource_management/core/providers/system.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/resource_management/core/providers/system.py b/slider-agent/src/main/python/resource_management/core/providers/system.py index f32ee3b..6f56967 100644 --- a/slider-agent/src/main/python/resource_management/core/providers/system.py +++ b/slider-agent/src/main/python/resource_management/core/providers/system.py @@ -253,7 +253,7 @@ class ExecuteProvider(Provider): cwd=self.resource.cwd, env=self.resource.environment, preexec_fn=_preexec_fn(self.resource), user=self.resource.user, wait_for_finish=self.resource.wait_for_finish, timeout=self.resource.timeout, - pid_file=self.resource.pid_file) + pid_file=self.resource.pid_file, poll_after=self.resource.poll_after) break except Fail as ex: if i == self.resource.tries-1: # last try http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/slider-agent/src/main/python/resource_management/core/providers/windows/system.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/resource_management/core/providers/windows/system.py b/slider-agent/src/main/python/resource_management/core/providers/windows/system.py index 9514bdb..6167977 100644 --- a/slider-agent/src/main/python/resource_management/core/providers/windows/system.py +++ b/slider-agent/src/main/python/resource_management/core/providers/windows/system.py @@ -62,9 +62,11 @@ def _merge_env(env1, env2, merge_keys=['PYTHONPATH']): result_env[str(key)] = str(os.pathsep.join(set(all_values))) return result_env + # Execute command. As windows stack heavily relies on proper environment it is better to reload fresh environment # on every execution. env variable will me merged with fresh environment for user. -def _call_command(command, logoutput=False, cwd=None, env=None, wait_for_finish=True, timeout=None, user=None, pid_file_name=None): +def _call_command(command, logoutput=False, cwd=None, env=None, wait_for_finish=True, timeout=None, user=None, + pid_file_name=None, poll_after=None): # TODO implement user Logger.info("Executing %s" % (command)) proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -73,26 +75,41 @@ def _call_command(command, logoutput=False, cwd=None, env=None, wait_for_finish= if not wait_for_finish: Logger.debug("No need to wait for the process to exit. Will leave the process running ...") code = 0 + logAnyway = False if pid_file_name: Logger.debug("Writing the process id %s to file %s" % (str(proc.pid), pid_file_name)) pidfile = open(pid_file_name, 'w') pidfile.write(str(proc.pid)) pidfile.close() Logger.info("Wrote the process id to file %s" % pid_file_name) - return code, None, None + + ## wait poll_after seconds and poll + if poll_after: + time.sleep(poll_after) + if proc.poll() is None: + return code, None, None # if still running then return + else: + logAnyway = True # assume failure and log + Logger.warning("Process is not up after the polling interval " + str(poll_after) + " seconds.") + else: + return code, None, None if timeout: q = Queue() - t = threading.Timer( timeout, on_timeout, [proc, q] ) + t = threading.Timer(timeout, on_timeout, [proc, q]) t.start() out, err = proc.communicate() code = proc.returncode - if logoutput and out: - Logger.info(out) - if logoutput and err: - Logger.info(err) + if logoutput or logAnyway: + if out: + Logger.info("Out: " + str(out)) + if err: + Logger.info("Err: " + str(err)) + if code: + Logger.info("Ret Code: " + str(code)) + return code, out, err # see msdn Icacls doc for rights @@ -183,7 +200,7 @@ class ExecuteProvider(Provider): cwd=self.resource.cwd, env=self.resource.environment, wait_for_finish=self.resource.wait_for_finish, timeout=self.resource.timeout, user=self.resource.user, - pid_file_name=self.resource.pid_file) + pid_file_name=self.resource.pid_file, poll_after=self.resource.poll_after) if code != 0 and not self.resource.ignore_failures: raise Fail("Failed to execute " + self.resource.command) break http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/slider-agent/src/main/python/resource_management/core/resources/system.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/resource_management/core/resources/system.py b/slider-agent/src/main/python/resource_management/core/resources/system.py index 3c3513b..f751d52 100644 --- a/slider-agent/src/main/python/resource_management/core/resources/system.py +++ b/slider-agent/src/main/python/resource_management/core/resources/system.py @@ -106,6 +106,7 @@ class Execute(Resource): if wait_for_finish is True then optionally the caller can ask for the pid to be written """ pid_file = ResourceArgument() + poll_after = ResourceArgument() #seconds class ExecuteScript(Resource): http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/slider-agent/src/main/python/resource_management/core/shell.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/resource_management/core/shell.py b/slider-agent/src/main/python/resource_management/core/shell.py index fb2c946..95d18fc 100644 --- a/slider-agent/src/main/python/resource_management/core/shell.py +++ b/slider-agent/src/main/python/resource_management/core/shell.py @@ -29,17 +29,18 @@ from multiprocessing import Queue from exceptions import Fail from exceptions import ExecuteTimeoutException from resource_management.core.logger import Logger +import time def checked_call(command, logoutput=False, - cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, pid_file=None): - return _call(command, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, pid_file) + cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, pid_file=None, poll_after=None): + return _call(command, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, pid_file, poll_after) def call(command, logoutput=False, - cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, pid_file=None): - return _call(command, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout, pid_file) + cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, pid_file=None, poll_after=None): + return _call(command, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout, pid_file, poll_after) def _call(command, logoutput=False, throw_on_failure=True, - cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, pid_file_name=None): + cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, pid_file_name=None, poll_after=None): """ Execute shell command @@ -66,13 +67,25 @@ def _call(command, logoutput=False, throw_on_failure=True, cwd=cwd, env=env, shell=False, preexec_fn=preexec_fn) + logAnyway = False if not wait_for_finish: if pid_file_name: pidfile = open(pid_file_name, 'w') pidfile.write(str(proc.pid)) pidfile.close() - return None, None - + + ## wait poll_after seconds and poll + if poll_after: + time.sleep(poll_after) + if proc.poll() is None: + return None, None #if still running then return + else: + logAnyway = True #assume failure and log + Logger.warning("Process is not up after the polling interval " + str(poll_after) + " seconds.") + else: + return None, None + + if timeout: q = Queue() t = threading.Timer( timeout, on_timeout, [proc, q] ) @@ -89,7 +102,7 @@ def _call(command, logoutput=False, throw_on_failure=True, code = proc.returncode - if logoutput and out: + if (logoutput or logAnyway) and out: Logger.info(out) if throw_on_failure and code: http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/slider-agent/src/main/python/resource_management/libraries/functions/check_process_status.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/resource_management/libraries/functions/check_process_status.py b/slider-agent/src/main/python/resource_management/libraries/functions/check_process_status.py index b491f66..8f10455 100644 --- a/slider-agent/src/main/python/resource_management/libraries/functions/check_process_status.py +++ b/slider-agent/src/main/python/resource_management/libraries/functions/check_process_status.py @@ -42,7 +42,7 @@ def check_process_status(pid_file): """ if not pid_file or not os.path.isfile(pid_file): if not pid_file: - Logger.warn("pid_file is not valid") + Logger.warning("pid_file is not valid") else: Logger.info("pid file does not exist {0}".format(pid_file)) raise ComponentIsNotRunning() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9641bd0c/slider-agent/src/test/python/resource_management/TestExecuteResource.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/resource_management/TestExecuteResource.py b/slider-agent/src/test/python/resource_management/TestExecuteResource.py index f0a4539..113644d 100644 --- a/slider-agent/src/test/python/resource_management/TestExecuteResource.py +++ b/slider-agent/src/test/python/resource_management/TestExecuteResource.py @@ -62,6 +62,31 @@ class TestExecuteResource(TestCase): self.assertTrue(popen_mock.called, 'subprocess.Popen should have been called!') self.assertFalse(proc_communicate_mock.called, 'proc.communicate should not have been called!') + @patch('subprocess.Popen.communicate') + @patch('subprocess.Popen') + def test_attribute_wait_and_poll(self, popen_mock, proc_communicate_mock): + with Environment("/") as env: + try: + Execute('echo "1"', + wait_for_finish=False, + poll_after = 5) + self.assertTrue(False, "Should fail as process does not run for 5 seconds") + except Fail as e: + self.assertTrue("returned 1" in e.message) + pass + + self.assertTrue(popen_mock.called, 'subprocess.Popen should have been called!') + self.assertFalse(proc_communicate_mock.called, 'proc.communicate should not have been called!') + + @patch('subprocess.Popen.communicate') + def test_attribute_wait_and_poll_and_success(self, proc_communicate_mock): + with Environment("/") as env: + Execute('sleep 6', + wait_for_finish=False, + poll_after = 2) + + self.assertFalse(proc_communicate_mock.called, 'proc.communicate should not have been called!') + @patch.object(os.path, "exists") @patch.object(subprocess, "Popen") def test_attribute_creates(self, popen_mock, exists_mock):