Repository: aurora Updated Branches: refs/heads/master d752d466c -> 7fc4bff7b
Exposing DSL defined variables to shell health checkers Bugs closed: AURORA-1622 Reviewed at https://reviews.apache.org/r/44486/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/7fc4bff7 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/7fc4bff7 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/7fc4bff7 Branch: refs/heads/master Commit: 7fc4bff7b7d738dd20f2556d35aa3967245bd931 Parents: d752d46 Author: Dmitriy Shirchenko <cald...@gmail.com> Authored: Fri Mar 11 14:48:59 2016 -0700 Committer: John Sirois <john.sir...@gmail.com> Committed: Fri Mar 11 14:48:59 2016 -0700 ---------------------------------------------------------------------- .../apache/aurora/common/health_check/shell.py | 2 +- .../aurora/executor/common/health_checker.py | 32 ++++++++++++++++++-- .../aurora/common/health_check/test_shell.py | 12 +++++--- .../executor/common/test_health_checker.py | 32 +++++++++++++++++++- 4 files changed, 70 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/7fc4bff7/src/main/python/apache/aurora/common/health_check/shell.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/common/health_check/shell.py b/src/main/python/apache/aurora/common/health_check/shell.py index 890bf0c..bf63d93 100644 --- a/src/main/python/apache/aurora/common/health_check/shell.py +++ b/src/main/python/apache/aurora/common/health_check/shell.py @@ -47,7 +47,7 @@ class ShellHealthCheck(object): """ cmd = shlex.split(self.cmd) try: - subprocess.check_call(cmd, timeout=self.timeout_secs) + subprocess.check_call(cmd, timeout=self.timeout_secs, shell=True) return True, None except subprocess.CalledProcessError as reason: # The command didn't return a 0 so provide reason for failure. http://git-wip-us.apache.org/repos/asf/aurora/blob/7fc4bff7/src/main/python/apache/aurora/executor/common/health_checker.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py index 3039727..28fd3ec 100644 --- a/src/main/python/apache/aurora/executor/common/health_checker.py +++ b/src/main/python/apache/aurora/executor/common/health_checker.py @@ -18,12 +18,15 @@ import time import traceback from mesos.interface.mesos_pb2 import TaskState +from pystachio import Environment, String from twitter.common import log from twitter.common.exceptions import ExceptionalThread from twitter.common.metrics import LambdaGauge from apache.aurora.common.health_check.http_signaler import HttpSignaler from apache.aurora.common.health_check.shell import ShellHealthCheck +from apache.aurora.config.schema.base import MesosContext +from apache.thermos.config.schema import ThermosContext from .status_checker import StatusChecker, StatusCheckerProvider, StatusResult from .task_info import mesos_task_instance_from_assigned_task, resolve_ports @@ -203,6 +206,25 @@ class HealthChecker(StatusChecker): class HealthCheckerProvider(StatusCheckerProvider): + + @staticmethod + def interpolate_cmd(task, cmd): + """ + :param task: Assigned task passed from Mesos Agent + :param cmd: Command defined inside shell_command inside config. + :return: Interpolated cmd with filled in values, for example ports. + """ + thermos_namespace = ThermosContext( + task_id=task.taskId, + ports=task.assignedPorts) + mesos_namespace = MesosContext(instance=task.instanceId) + command = String(cmd) % Environment( + thermos=thermos_namespace, + mesos=mesos_namespace + ) + + return command.get() + def from_assigned_task(self, assigned_task, sandbox): """ :param assigned_task: @@ -215,9 +237,15 @@ class HealthCheckerProvider(StatusCheckerProvider): timeout_secs = health_check_config.get('timeout_secs') if SHELL_HEALTH_CHECK in health_checker: shell_command = health_checker.get(SHELL_HEALTH_CHECK, {}).get('shell_command') + # Filling in variables eg thermos.ports[http] that could have been passed in as part of + # shell_command. + interpolated_command = HealthCheckerProvider.interpolate_cmd( + task=assigned_task, + cmd=shell_command + ) shell_signaler = ShellHealthCheck( - cmd=shell_command, - timeout_secs=timeout_secs + cmd=interpolated_command, + timeout_secs=timeout_secs, ) a_health_checker = lambda: shell_signaler() else: http://git-wip-us.apache.org/repos/asf/aurora/blob/7fc4bff7/src/test/python/apache/aurora/common/health_check/test_shell.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/common/health_check/test_shell.py b/src/test/python/apache/aurora/common/health_check/test_shell.py index 84f717f..8d3a3e4 100644 --- a/src/test/python/apache/aurora/common/health_check/test_shell.py +++ b/src/test/python/apache/aurora/common/health_check/test_shell.py @@ -41,7 +41,8 @@ class TestHealthChecker(unittest.TestCase): self.assertIsNone(msg) mock_sub.assert_called_once_with( ['success', 'cmd'], - timeout=30 + timeout=30, + shell=True, ) @mock.patch('subprocess32.check_call') @@ -54,7 +55,8 @@ class TestHealthChecker(unittest.TestCase): success, msg = shell() mock_sub.assert_called_once_with( ['cmd', 'to', 'fail'], - timeout=30 + timeout=30, + shell=True, ) self.assertFalse(success) self.assertEqual(msg, "Command 'failed' returned non-zero exit status 1") @@ -69,7 +71,8 @@ class TestHealthChecker(unittest.TestCase): success, msg = shell() mock_sub.assert_called_once_with( ['cmd', 'to', 'not', 'exist'], - timeout=30 + timeout=30, + shell=True, ) self.assertFalse(success) self.assertEqual(msg, 'OSError: failed') @@ -85,7 +88,8 @@ class TestHealthChecker(unittest.TestCase): success, msg = shell() mock_sub.assert_called_once_with( ['defensive', 'cmd'], - timeout=10 + timeout=10, + shell=True, ) self.assertFalse(success) self.assertEqual(msg, 'Invalid commmand.') http://git-wip-us.apache.org/repos/asf/aurora/blob/7fc4bff7/src/test/python/apache/aurora/executor/common/test_health_checker.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/common/test_health_checker.py b/src/test/python/apache/aurora/executor/common/test_health_checker.py index 9bebce8..19c4f76 100644 --- a/src/test/python/apache/aurora/executor/common/test_health_checker.py +++ b/src/test/python/apache/aurora/executor/common/test_health_checker.py @@ -267,7 +267,7 @@ class TestHealthCheckerProvider(unittest.TestCase): ).json_dumps() ) ) - assigned_task = AssignedTask(task=task_config, instanceId=1) + assigned_task = AssignedTask(task=task_config, instanceId=1, assignedPorts={'foo': 9001}) execconfig_data = json.loads(assigned_task.task.executorConfig.data) assert execconfig_data[ 'health_check_config']['health_checker']['shell']['shell_command'] == 'failed command' @@ -277,6 +277,36 @@ class TestHealthCheckerProvider(unittest.TestCase): hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures assert hct_max_fail == max_consecutive_failures + def test_interpolate_cmd(self): + """Making sure thermos.ports[foo] gets correctly substituted with assignedPorts info.""" + interval_secs = 17 + initial_interval_secs = 3 + max_consecutive_failures = 2 + timeout_secs = 5 + shell_cmd = 'FOO_PORT={{thermos.ports[foo]}} failed command' + shell_config = ShellHealthChecker(shell_command=shell_cmd) + task_config = TaskConfig( + executorConfig=ExecutorConfig( + name='thermos-generic', + data=MESOS_JOB( + task=HELLO_WORLD, + health_check_config=HealthCheckConfig( + health_checker=HealthCheckerConfig(shell=shell_config), + interval_secs=interval_secs, + initial_interval_secs=initial_interval_secs, + max_consecutive_failures=max_consecutive_failures, + timeout_secs=timeout_secs, + ) + ).json_dumps() + ) + ) + assigned_task = AssignedTask(task=task_config, instanceId=1, assignedPorts={'foo': 9001}) + interpolated_cmd = HealthCheckerProvider.interpolate_cmd( + assigned_task, + cmd=shell_cmd + ) + assert interpolated_cmd == 'FOO_PORT=9001 failed command' + def test_from_assigned_task_no_health_port(self): interval_secs = 17 initial_interval_secs = 3