Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes e9ed005df -> 236f5e0d3 (forced update)
introduced process grouping Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/236f5e0d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/236f5e0d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/236f5e0d Branch: refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes Commit: 236f5e0d3f23092db2634597bc10e655b67831fb Parents: 75112ab Author: max-orlov <ma...@gigaspaces.com> Authored: Sun Jun 25 12:19:02 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Sun Jun 25 13:07:47 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/core/engine.py | 12 +++++- aria/orchestrator/workflows/executor/base.py | 7 ++++ aria/orchestrator/workflows/executor/process.py | 29 +++++++++++--- .../orchestrator/workflows/executor/__init__.py | 7 ++++ .../workflows/executor/test_process_executor.py | 41 ++++++++++++++++++++ tests/requirements.txt | 1 + 6 files changed, 90 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/236f5e0d/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index d5a6e70..02b1dee 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -49,7 +49,6 @@ class Engine(logger.LoggerMixin): if resuming: events.on_resume_workflow_signal.send(ctx) - try: events.start_workflow_signal.send(ctx) while True: @@ -65,13 +64,22 @@ class Engine(logger.LoggerMixin): else: time.sleep(0.1) if cancel: - events.on_cancelled_workflow_signal.send(ctx) + try: + self._terminate_tasks(executing_tasks) + finally: + events.on_cancelled_workflow_signal.send(ctx) else: events.on_success_workflow_signal.send(ctx) except BaseException as e: + # Cleanup any remaining tasks + self._terminate_tasks(executing_tasks) events.on_failure_workflow_signal.send(ctx, exception=e) raise + def _terminate_tasks(self, tasks): + for task in tasks: + self._executors[task._executor].terminate(task) + @staticmethod def cancel_execution(ctx): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/236f5e0d/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 6a3c9d2..038a2e3 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -48,6 +48,13 @@ class BaseExecutor(logger.LoggerMixin): """ pass + def terminate(self, ctx): + """ + Terminate the executing task + :return: + """ + pass + @staticmethod def _task_started(ctx): events.start_task_signal.send(ctx) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/236f5e0d/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 8518b33..354210d 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -25,6 +25,10 @@ import sys # As part of the process executor implementation, subprocess are started with this module as their # entry point. We thus remove this module's directory from the python path if it happens to be # there +from collections import namedtuple + +import signal + script_dir = os.path.dirname(__file__) if script_dir in sys.path: sys.path.remove(script_dir) @@ -57,6 +61,9 @@ UPDATE_TRACKED_CHANGES_FAILED_STR = \ 'Some changes failed writing to storage. For more info refer to the log.' +_Task = namedtuple('_Task', 'proc, ctx') + + class ProcessExecutor(base.BaseExecutor): """ Executor which runs tasks in a subprocess environment @@ -113,9 +120,18 @@ class ProcessExecutor(base.BaseExecutor): self._server_socket.close() self._listener_thread.join(timeout=60) + def terminate(self, ctx): + task = self._tasks.get(ctx.task.id) + # The process might have managed to finished so it would not be in the tasks list + if task: + if os.getsid(os.getpid()) != os.getpgid(task.proc.pid): + # If the above condition is false, the process group leader is the group leader + # for the current session of the system, and killing it will kill the the entire + # os session. + os.killpg(os.getpgid(task.proc.pid), signal.SIGTERM) + def _execute(self, ctx): self._check_closed() - self._tasks[ctx.task.id] = ctx # Temporary file used to pass arguments to the started subprocess file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json') @@ -125,11 +141,14 @@ class ProcessExecutor(base.BaseExecutor): env = self._construct_subprocess_env(task=ctx.task) # Asynchronously start the operation in a subprocess - subprocess.Popen( + proc = subprocess.Popen( '{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path), env=env, + preexec_fn=os.setsid, shell=True) + self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc) + def _remove_task(self, task_id): return self._tasks.pop(task_id) @@ -191,15 +210,15 @@ class ProcessExecutor(base.BaseExecutor): _send_message(connection, response) def _handle_task_started_request(self, task_id, **kwargs): - self._task_started(self._tasks[task_id]) + self._task_started(self._tasks[task_id].ctx) def _handle_task_succeeded_request(self, task_id, **kwargs): task = self._remove_task(task_id) - self._task_succeeded(task) + self._task_succeeded(task.ctx) def _handle_task_failed_request(self, task_id, request, **kwargs): task = self._remove_task(task_id) - self._task_failed(task, exception=request['exception'], traceback=request['traceback']) + self._task_failed(task.ctx, exception=request['exception'], traceback=request['traceback']) def _send_message(connection, message): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/236f5e0d/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index 83584a6..99d0b39 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -18,10 +18,13 @@ from contextlib import contextmanager import aria from aria.modeling import models +from aria.orchestrator.context.common import BaseContext class MockContext(object): + INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS + def __init__(self, storage, task_kwargs=None): self.logger = logging.getLogger('mock_logger') self._task_kwargs = task_kwargs or {} @@ -46,6 +49,10 @@ class MockContext(object): def close(self): pass + @property + def model(self): + return self._storage + @classmethod def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None): return cls(storage=aria.application_model_storage(**(storage_kwargs or {})), http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/236f5e0d/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 755b9be..2243fb6 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -14,17 +14,23 @@ # limitations under the License. import os +import time import Queue +import subprocess import pytest +import psutil import aria +from aria import operation +from aria.modeling import models from aria.orchestrator import events from aria.utils.plugin import create as create_plugin from aria.orchestrator.workflows.executor import process import tests.storage import tests.resources +from tests.helpers import FilesystemDataHolder from tests.fixtures import ( # pylint: disable=unused-import plugins_dir, plugin_manager, @@ -71,6 +77,33 @@ class TestProcessExecutor(object): executor.execute(MockContext(model, task_kwargs=dict(function='some.function'))) assert 'closed' in exc_info.value.message + def test_process_termination(self, executor, model, fs_test_holder): + ctx = MockContext( + model, + task_kwargs=dict( + function='{0}.{1}'.format(__name__, freezing_task.__name__), + arguments=dict(holder_path=models.Argument.wrap('holder_path', + fs_test_holder._path))), + ) + + executor.execute(ctx) + + while fs_test_holder.get('subproc', None) is None: + time.sleep(1) + pids = [executor._tasks[ctx.task.id].proc.pid, fs_test_holder['subproc']] + assert any(p.pid == pid for p in psutil.process_iter() for pid in pids) + executor.terminate(ctx) + assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE + for p in psutil.process_iter() + for pid in pids) + + +@pytest.fixture +def fs_test_holder(tmpdir): + dataholder_path = str(tmpdir.join('dataholder')) + holder = FilesystemDataHolder(dataholder_path) + return holder + @pytest.fixture def executor(plugin_manager): @@ -92,3 +125,11 @@ def model(tmpdir): initiator_kwargs=dict(base_dir=str(tmpdir))) yield _storage tests.storage.release_sqlite_storage(_storage) + + +@operation +def freezing_task(holder_path, **_): + holder = FilesystemDataHolder(holder_path) + holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', shell=True).pid + while True: + time.sleep(5) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/236f5e0d/tests/requirements.txt ---------------------------------------------------------------------- diff --git a/tests/requirements.txt b/tests/requirements.txt index 71a227a..cf57821 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -13,6 +13,7 @@ testtools fasteners==0.13.0 sh==1.12.13 +psutil==5.2.2 mock==1.0.1 pylint==1.6.4 pytest==3.0.2