introduced process grouping

Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b1dd1c09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b1dd1c09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b1dd1c09

Branch: refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes
Commit: b1dd1c09f196703896a27daaaa638c98b878599b
Parents: a75a3de
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Jun 25 12:19:02 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Jun 25 15:32:42 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/core/engine.py      | 11 +++++-
 aria/orchestrator/workflows/executor/base.py    |  7 ++++
 aria/orchestrator/workflows/executor/process.py | 29 +++++++++++---
 .../orchestrator/workflows/executor/__init__.py |  7 ++++
 .../workflows/executor/test_process_executor.py | 41 ++++++++++++++++++++
 tests/requirements.txt                          |  1 +
 6 files changed, 90 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1dd1c09/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py 
b/aria/orchestrator/workflows/core/engine.py
index d52ae85..373abb8 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -66,13 +66,22 @@ class Engine(logger.LoggerMixin):
                 else:
                     time.sleep(0.1)
             if cancel:
-                events.on_cancelled_workflow_signal.send(ctx)
+                try:
+                    self._terminate_tasks(tasks_tracker.executing_tasks)
+                finally:
+                    events.on_cancelled_workflow_signal.send(ctx)
             else:
                 events.on_success_workflow_signal.send(ctx)
         except BaseException as e:
+            # Cleanup any remaining tasks
+            self._terminate_tasks(tasks_tracker.executing_tasks)
             events.on_failure_workflow_signal.send(ctx, exception=e)
             raise
 
+    def _terminate_tasks(self, tasks):
+        for task in tasks:
+            self._executors[task._executor].terminate(task)
+
     @staticmethod
     def cancel_execution(ctx):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1dd1c09/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py 
b/aria/orchestrator/workflows/executor/base.py
index 6a3c9d2..038a2e3 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -48,6 +48,13 @@ class BaseExecutor(logger.LoggerMixin):
         """
         pass
 
+    def terminate(self, ctx):
+        """
+        Terminate the executing task
+        :return:
+        """
+        pass
+
     @staticmethod
     def _task_started(ctx):
         events.start_task_signal.send(ctx)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1dd1c09/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 8518b33..354210d 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -25,6 +25,10 @@ import sys
 # As part of the process executor implementation, subprocess are started with 
this module as their
 # entry point. We thus remove this module's directory from the python path if 
it happens to be
 # there
+from collections import namedtuple
+
+import signal
+
 script_dir = os.path.dirname(__file__)
 if script_dir in sys.path:
     sys.path.remove(script_dir)
@@ -57,6 +61,9 @@ UPDATE_TRACKED_CHANGES_FAILED_STR = \
     'Some changes failed writing to storage. For more info refer to the log.'
 
 
+_Task = namedtuple('_Task', 'proc, ctx')
+
+
 class ProcessExecutor(base.BaseExecutor):
     """
     Executor which runs tasks in a subprocess environment
@@ -113,9 +120,18 @@ class ProcessExecutor(base.BaseExecutor):
         self._server_socket.close()
         self._listener_thread.join(timeout=60)
 
+    def terminate(self, ctx):
+        task = self._tasks.get(ctx.task.id)
+        # The process might have managed to finished so it would not be in the 
tasks list
+        if task:
+            if os.getsid(os.getpid()) != os.getpgid(task.proc.pid):
+                # If the above condition is false, the process group leader is 
the group leader
+                # for the current session of the system, and killing it will 
kill the the entire
+                # os session.
+                os.killpg(os.getpgid(task.proc.pid), signal.SIGTERM)
+
     def _execute(self, ctx):
         self._check_closed()
-        self._tasks[ctx.task.id] = ctx
 
         # Temporary file used to pass arguments to the started subprocess
         file_descriptor, arguments_json_path = 
tempfile.mkstemp(prefix='executor-', suffix='.json')
@@ -125,11 +141,14 @@ class ProcessExecutor(base.BaseExecutor):
 
         env = self._construct_subprocess_env(task=ctx.task)
         # Asynchronously start the operation in a subprocess
-        subprocess.Popen(
+        proc = subprocess.Popen(
             '{0} {1} {2}'.format(sys.executable, __file__, 
arguments_json_path),
             env=env,
+            preexec_fn=os.setsid,
             shell=True)
 
+        self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc)
+
     def _remove_task(self, task_id):
         return self._tasks.pop(task_id)
 
@@ -191,15 +210,15 @@ class ProcessExecutor(base.BaseExecutor):
                 _send_message(connection, response)
 
     def _handle_task_started_request(self, task_id, **kwargs):
-        self._task_started(self._tasks[task_id])
+        self._task_started(self._tasks[task_id].ctx)
 
     def _handle_task_succeeded_request(self, task_id, **kwargs):
         task = self._remove_task(task_id)
-        self._task_succeeded(task)
+        self._task_succeeded(task.ctx)
 
     def _handle_task_failed_request(self, task_id, request, **kwargs):
         task = self._remove_task(task_id)
-        self._task_failed(task, exception=request['exception'], 
traceback=request['traceback'])
+        self._task_failed(task.ctx, exception=request['exception'], 
traceback=request['traceback'])
 
 
 def _send_message(connection, message):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1dd1c09/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py 
b/tests/orchestrator/workflows/executor/__init__.py
index 83584a6..99d0b39 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -18,10 +18,13 @@ from contextlib import contextmanager
 
 import aria
 from aria.modeling import models
+from aria.orchestrator.context.common import BaseContext
 
 
 class MockContext(object):
 
+    INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS
+
     def __init__(self, storage, task_kwargs=None):
         self.logger = logging.getLogger('mock_logger')
         self._task_kwargs = task_kwargs or {}
@@ -46,6 +49,10 @@ class MockContext(object):
     def close(self):
         pass
 
+    @property
+    def model(self):
+        return self._storage
+
     @classmethod
     def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
         return cls(storage=aria.application_model_storage(**(storage_kwargs or 
{})),

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1dd1c09/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py 
b/tests/orchestrator/workflows/executor/test_process_executor.py
index 755b9be..2243fb6 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -14,17 +14,23 @@
 # limitations under the License.
 
 import os
+import time
 import Queue
+import subprocess
 
 import pytest
+import psutil
 
 import aria
+from aria import operation
+from aria.modeling import models
 from aria.orchestrator import events
 from aria.utils.plugin import create as create_plugin
 from aria.orchestrator.workflows.executor import process
 
 import tests.storage
 import tests.resources
+from tests.helpers import FilesystemDataHolder
 from tests.fixtures import (  # pylint: disable=unused-import
     plugins_dir,
     plugin_manager,
@@ -71,6 +77,33 @@ class TestProcessExecutor(object):
             executor.execute(MockContext(model, 
task_kwargs=dict(function='some.function')))
         assert 'closed' in exc_info.value.message
 
+    def test_process_termination(self, executor, model, fs_test_holder):
+        ctx = MockContext(
+            model,
+            task_kwargs=dict(
+                function='{0}.{1}'.format(__name__, freezing_task.__name__),
+                arguments=dict(holder_path=models.Argument.wrap('holder_path',
+                                                                
fs_test_holder._path))),
+        )
+
+        executor.execute(ctx)
+
+        while fs_test_holder.get('subproc', None) is None:
+            time.sleep(1)
+        pids = [executor._tasks[ctx.task.id].proc.pid, 
fs_test_holder['subproc']]
+        assert any(p.pid == pid for p in psutil.process_iter() for pid in pids)
+        executor.terminate(ctx)
+        assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE
+                       for p in psutil.process_iter()
+                       for pid in pids)
+
+
+@pytest.fixture
+def fs_test_holder(tmpdir):
+    dataholder_path = str(tmpdir.join('dataholder'))
+    holder = FilesystemDataHolder(dataholder_path)
+    return holder
+
 
 @pytest.fixture
 def executor(plugin_manager):
@@ -92,3 +125,11 @@ def model(tmpdir):
                                               
initiator_kwargs=dict(base_dir=str(tmpdir)))
     yield _storage
     tests.storage.release_sqlite_storage(_storage)
+
+
+@operation
+def freezing_task(holder_path, **_):
+    holder = FilesystemDataHolder(holder_path)
+    holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', 
shell=True).pid
+    while True:
+        time.sleep(5)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1dd1c09/tests/requirements.txt
----------------------------------------------------------------------
diff --git a/tests/requirements.txt b/tests/requirements.txt
index 71a227a..cf57821 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -13,6 +13,7 @@
 testtools
 fasteners==0.13.0
 sh==1.12.13
+psutil==5.2.2
 mock==1.0.1
 pylint==1.6.4
 pytest==3.0.2

Reply via email to