ARIA-4 Create an API for creating workflows

An API for creating workflows. Users can build graphs of tasks and
set depenedencies in between tasks to execute them in a specific
order.

This commit also includes minimal reorganization of a few test
modules, so they are now using the same file system hierarchy as
the modules which they test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/107c9729
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/107c9729
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/107c9729

Branch: refs/heads/ARIA-3-api-for-creating-workflows
Commit: 107c972981dbd2ce0bbc3ea55e2801049031a581
Parents: e1c919d
Author: Ran Ziv <r...@gigaspaces.com>
Authored: Fri Oct 21 13:21:25 2016 +0300
Committer: Ran Ziv <r...@gigaspaces.com>
Committed: Sun Oct 30 08:51:19 2016 +0200

----------------------------------------------------------------------
 aria/contexts.py                                |   2 +-
 aria/workflows/api/task_graph.py                | 305 +++++++++
 aria/workflows/api/tasks_graph.py               | 203 ------
 tests/workflows/api/__init__.py                 |   0
 tests/workflows/api/test_task_graph.py          | 658 +++++++++++++++++++
 tests/workflows/core/__init__.py                |   0
 tests/workflows/core/test_executor.py           | 117 ++++
 .../test_task_graph_into_exececution_graph.py   |  79 +++
 tests/workflows/test_executor.py                | 117 ----
 .../test_task_graph_into_exececution_graph.py   |  79 ---
 10 files changed, 1160 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/aria/contexts.py
----------------------------------------------------------------------
diff --git a/aria/contexts.py b/aria/contexts.py
index ae7fc66..3bda9be 100644
--- a/aria/contexts.py
+++ b/aria/contexts.py
@@ -21,7 +21,7 @@ from uuid import uuid4
 
 from aria.logger import LoggerMixin
 from aria.tools.lru_cache import lru_cache
-from aria.workflows.api.tasks_graph import TaskGraph
+from aria.workflows.api.task_graph import TaskGraph
 
 
 class WorkflowContext(LoggerMixin):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/aria/workflows/api/task_graph.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/task_graph.py b/aria/workflows/api/task_graph.py
new file mode 100644
index 0000000..fdc4e2b
--- /dev/null
+++ b/aria/workflows/api/task_graph.py
@@ -0,0 +1,305 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Task graph. Used by users to build workflows
+"""
+
+from uuid import uuid4
+
+from networkx import DiGraph
+
+
+class TaskNotInGraphError(Exception):
+    """
+    An error representing a scenario where a given task is not in the graph as 
expected
+    """
+    pass
+
+
+class TaskGraph(object):
+    """
+    A tasks graph builder.
+    Build an operations flow graph
+    """
+
+    def __init__(self, name):
+        self.name = name
+        self._id = str(uuid4())
+        self._graph = DiGraph()
+
+    def __repr__(self):
+        return '{name}(id={self._id}, name={self.name}, 
graph={self._graph!r})'.format(
+            name=self.__class__.__name__, self=self)
+
+    # graph traversal methods
+
+    @property
+    def tasks(self):
+        """
+        An iterator on tasks added to the graph
+        :yields: Iterator over all tasks in the graph
+        """
+        for _, data in self._graph.nodes_iter(data=True):
+            yield data['task']
+
+    def get_task_dependencies(self, dependent_task):
+        """
+        Iterates over the task's dependencies
+        :param BaseTask dependent_task: The task whose dependencies are 
requested
+        :yields: Iterator over all tasks which dependency_task depends on
+        :raise: TaskNotInGraphError if dependent_task is not in the graph
+        """
+        if not self.has_task(dependent_task):
+            raise TaskNotInGraphError('Task id: {0}'.format(dependent_task.id))
+        for _, dependency_id in self._graph.out_edges_iter(dependent_task.id):
+            yield self.get_task(dependency_id)
+
+    def get_task_dependents(self, dependency_task):
+        """
+        Iterates over the task's dependents
+        :param BaseTask dependency_task: The task whose dependents are 
requested
+        :yields: Iterator over all tasks which depend on dependency_task
+        :raise: TaskNotInGraphError if dependency_task is not in the graph
+        """
+        if not self.has_task(dependency_task):
+            raise TaskNotInGraphError('Task id: 
{0}'.format(dependency_task.id))
+        for dependent_id, _ in self._graph.in_edges_iter(dependency_task.id):
+            yield self.get_task(dependent_id)
+
+    # task methods
+
+    def get_task(self, task_id):
+        """
+        Get a task instance that's been inserted to the graph by the task's id
+        :param basestring task_id: The task's id
+        :return: Requested task
+        :rtype: BaseTask
+        :raise: TaskNotInGraphError if no task found in the graph with the 
given id
+        """
+        if not self._graph.has_node(task_id):
+            raise TaskNotInGraphError('Task id: {0}'.format(task_id))
+        data = self._graph.node[task_id]
+        return data['task']
+
+    def add_task(self, task):
+        """
+        Add a task to the graph
+        :param BaseTask task: The task
+        :return: False if task was already in the graph, otherwise True
+        :rtype: bool
+        """
+        if self.has_task(task):
+            return False
+        self._graph.add_node(task.id, task=task)
+        return True
+
+    def remove_task(self, task):
+        """
+        Remove the provided task from the graph
+        :param BaseTask task: The task
+        :return: False if task wasn't in the graph, otherwise True
+        :rtype: bool
+        """
+        if not self.has_task(task):
+            return False
+        self._graph.remove_node(task.id)
+        return True
+
+    def has_task(self, task):
+        """
+        Check whether a task is in the graph or not
+        :param BaseTask task: The task
+        :return: True if task is in the graph, otherwise True
+        :rtype: bool
+        """
+        return self._graph.has_node(task.id)
+
+    def add_dependency(self, dependent, dependency):
+        """
+        Add a dependency for one item (task, sequence or parallel) on another
+        The dependent will only be executed after the dependency terminates
+        If either of the items is either a sequence or a parallel,
+         multiple dependencies may be added
+        :param BaseTask|_TasksArrangement dependent: The dependent (task, 
sequence or parallel)
+        :param BaseTask|_TasksArrangement dependency: The dependency (task, 
sequence or parallel)
+        :return: True if the dependency between the two hadn't already 
existed, otherwise False
+        :rtype: bool
+        :raise TaskNotInGraphError if either the dependent or dependency are 
tasks which
+         are not in the graph
+        """
+        if self.has_dependency(dependent, dependency):
+            return False
+
+        dependent_tasks, dependency_tasks = \
+            self._extract_dependent_and_dependency_tasks(dependent, dependency)
+        for dependent_task in dependent_tasks:
+            for dependency_task in dependency_tasks:
+                self._graph.add_edge(dependent_task.id, dependency_task.id)
+        return True
+
+    def has_dependency(self, dependent, dependency):
+        """
+        Check whether one item (task, sequence or parallel) depends on another
+
+        Note that if either of the items is either a sequence or a parallel,
+        and some of the dependencies exist in the graph but not all of them,
+        this method will return False
+
+        :param BaseTask|_TasksArrangement dependent: The dependent (task, 
sequence or parallel)
+        :param BaseTask|_TasksArrangement dependency: The dependency (task, 
sequence or parallel)
+        :return: True if the dependency between the two exists, otherwise False
+        :rtype: bool
+        :raise TaskNotInGraphError if either the dependent or dependency are 
tasks
+         which are not in the graph
+        """
+        dependent_tasks, dependency_tasks = \
+            self._extract_dependent_and_dependency_tasks(dependent, dependency)
+        for dependent_task in dependent_tasks:
+            for dependency_task in dependency_tasks:
+                if not self._graph.has_edge(dependent_task.id, 
dependency_task.id):
+                    return False
+        return True
+
+    def remove_dependency(self, dependent, dependency):
+        """
+        Remove a dependency for one item (task, sequence or parallel) on 
another
+
+        Note that if either of the items is either a sequence or a parallel, 
and some of
+        the dependencies exist in the graph but not all of them, this method 
will not remove
+        any of the dependencies and return False
+
+        :param BaseTask|_TasksArrangement dependent: The dependent (task, 
sequence or parallel)
+        :param BaseTask|_TasksArrangement dependency: The dependency (task, 
sequence or parallel)
+        :return: False if the dependency between the two hadn't existed, 
otherwise True
+        :rtype: bool
+        :raise TaskNotInGraphError if either the dependent or dependency are 
tasks
+         which are not in the graph
+        """
+        if not self.has_dependency(dependent, dependency):
+            return False
+
+        dependent_tasks, dependency_tasks = \
+            self._extract_dependent_and_dependency_tasks(dependent, dependency)
+        for dependent_task in dependent_tasks:
+            for dependency_task in dependency_tasks:
+                self._graph.remove_edge(dependent_task.id, dependency_task.id)
+        return True
+
+    def sequence(self, item, *items):
+        """
+        A graph-building helper, useful for chaining items in a sequence -
+        it'll add a dependency for each item on the one that came before it
+
+        an item may be either a task, a sequence or a parallel
+        this method will also add to the graph each of the items not already 
present in it
+
+        :param BaseTask|_TasksArrangement item: The first item (task, sequence 
or parallel)
+        :param BaseTask|_TasksArrangement *items: The rest of the items (task, 
sequence or parallel)
+        :return: The sequence object
+        :rtype: _TasksSequence
+        """
+        all_items = [item] + list(items)
+        prev_item = None
+
+        for an_item in all_items:
+            if not isinstance(an_item, TaskGraph._TasksArrangement) and not 
self.has_task(an_item):
+                self.add_task(an_item)
+            if prev_item is not None:
+                self.add_dependency(an_item, prev_item)
+            prev_item = an_item
+
+        return self._TasksSequence(all_items)
+
+    def parallel(self, item, *items):
+        """
+        A graph-building helper, useful for allowing for several items to 
execute in parallel
+        this method won't actually add any dependency between the items passed 
to it -
+        but the returned object may be used to add dependencies for and on 
this group of items
+
+        an item may be either a task, a sequence or a parallel
+        this method will also add to the graph each of the items not already 
present in it
+
+        :param BaseTask|_TasksArrangement item: One item (task, sequence or 
parallel)
+        :param BaseTask|_TasksArrangement *items: The rest of the items (task, 
sequence or parallel)
+        :return: The parallel object
+        :rtype: _TasksParallel
+        """
+        all_items = [item] + list(items)
+
+        for an_item in all_items:
+            if not isinstance(an_item, TaskGraph._TasksArrangement) and not 
self.has_task(an_item):
+                self.add_task(an_item)
+
+        return self._TasksParallel(all_items)
+
+    def _extract_dependent_and_dependency_tasks(self, dependent, dependency):
+        if isinstance(dependent, TaskGraph._TasksArrangement):
+            dependent_tasks = dependent.get_first_tasks()
+        else:
+            if not self._graph.has_node(dependent.id):
+                raise TaskNotInGraphError(
+                    'dependent task {0!r} is not in graph (task id: 
{0.id})'.format(dependent))
+            dependent_tasks = [dependent]
+
+        if isinstance(dependency, TaskGraph._TasksArrangement):
+            dependency_tasks = dependency.get_last_tasks()
+        else:
+            if not self._graph.has_node(dependency.id):
+                raise TaskNotInGraphError(
+                    'dependency task {0!r} is not in graph (task id: 
{0.id})'.format(dependency))
+            dependency_tasks = [dependency]
+
+        return dependent_tasks, dependency_tasks
+
+    class _TasksArrangement(object):
+        def __init__(self, tasks):
+            self._tasks = tasks
+
+        def get_first_tasks(self):
+            """
+            The first tasks to be executed in the arrangement
+            :rtype list
+            :return: list of the first tasks to be executed in the arrangement
+            """
+            raise NotImplementedError()
+
+        def get_last_tasks(self):
+            """
+            The last tasks to be executed in the arrangement
+            :rtype list
+            :return: list of the last tasks to be executed in the arrangement
+            """
+            raise NotImplementedError()
+
+    class _TasksSequence(_TasksArrangement):
+        def __init__(self, tasks):
+            super(TaskGraph._TasksSequence, self).__init__(tasks)
+
+        def get_first_tasks(self):
+            return [self._tasks[0]]
+
+        def get_last_tasks(self):
+            return [self._tasks[-1]]
+
+    class _TasksParallel(_TasksArrangement):
+        def __init__(self, tasks):
+            super(TaskGraph._TasksParallel, self).__init__(tasks)
+
+        def get_first_tasks(self):
+            return self._tasks
+
+        def get_last_tasks(self):
+            return self._tasks

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/aria/workflows/api/tasks_graph.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/tasks_graph.py 
b/aria/workflows/api/tasks_graph.py
deleted file mode 100644
index 5160345..0000000
--- a/aria/workflows/api/tasks_graph.py
+++ /dev/null
@@ -1,203 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from uuid import uuid4
-
-from networkx import DiGraph, topological_sort
-
-from aria.tools.validation import ValidatorMixin
-
-
-class TaskNotFoundError(Exception):
-    pass
-
-
-class TaskNotInGraphError(Exception):
-    pass
-
-
-class TaskGraph(ValidatorMixin):
-    """
-    A task graph builder.
-    Build a operations flow graph
-    """
-
-    def __init__(self, name):
-        self.name = name
-        self.id = str(uuid4())
-        self.graph = DiGraph()
-
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.graph, attr)
-        except AttributeError:
-            return super(TaskGraph, self).__getattribute__(attr)
-
-    def __repr__(self):
-        return '{name}(id={self.id}, name={self.name}, 
graph={self.graph!r})'.format(
-            name=self.__class__.__name__, self=self)
-
-    @property
-    def tasks(self):
-        """
-        An iterator on tasks added to the graph
-        """
-        for _, data in self.graph.nodes_iter(data=True):
-            yield data['task']
-
-    @property
-    def leaf_tasks(self):
-        for task in self.tasks_in_order():
-            if not self.graph.predecessors(task.id):
-                yield task
-
-    def task_tree(self, reverse=False):
-        """
-        Iterates over the tasks to be executed in topological order and their 
dependencies.
-        :param reverse: reverse the order
-        """
-        for task in self.tasks_in_order(reverse=reverse):
-            yield task, self.task_dependencies(task)
-
-    def tasks_in_order(self, reverse=False):
-        """
-        Iterates over the tasks to be executed in topological order
-        :param reverse: reverse the order
-        """
-        for task_id in topological_sort(self.graph, reverse=reverse):
-            yield self.graph.node[task_id]['task']
-
-    def has_dependencies(self, task):
-        return len(self.task_dependencies(task)) > 0
-
-    def task_dependencies(self, task):
-        """
-        Iterates over the task dependencies
-        """
-        for task_ids in self.graph.edges_iter(task.id):
-            for task_id in task_ids:
-                if task.id != task_id:
-                    yield self.get_task(task_id)
-
-    def add_task(self, task):
-        """
-        Add a task to this graph
-        :param WorkflowTask|TaskGraph task: The task
-        """
-        self.graph.add_node(task.id, task=task)
-
-    def get_task(self, task_id):
-        """
-        Get a task instance that was inserted to this graph by its id
-
-        :param basestring task_id: the task id
-        :return: requested task
-        :rtype: WorkflowTask|TaskGraph
-        :raise: TaskNotFoundError if no task found with given id
-        """
-        try:
-            data = self.graph.node[task_id]
-            return data['task']
-        except KeyError:
-            raise TaskNotFoundError('Task id: {0}'.format(task_id))
-
-    def remove_task(self, task):
-        """
-        Remove the provided task from the graph
-        :param WorkflowTask|graph task: The task
-        """
-        self.graph.remove_node(task.id)
-
-    def dependency(self, source_task, after):
-        """
-        Add a dependency between tasks.
-        The source task will only be executed after the target task terminates.
-        A source task may depend on several tasks,
-        In which case it will only be executed after all its target tasks will 
terminate.
-
-        tasks flow order:
-        after -> source_task
-
-        :param WorkflowTask|TaskGraph source_task: The source task
-        :type source_task: WorkflowTask
-        :param list after: The target task
-        :raise TaskNotInGraphError
-        """
-        if not self.graph.has_node(source_task.id):
-            raise TaskNotInGraphError(
-                'source task {0!r} is not in graph (task id: 
{0.id})'.format(source_task))
-        for target_task in after:
-            if not self.graph.has_node(target_task.id):
-                raise TaskNotInGraphError(
-                    'target task {0!r} is not in graph (task id: 
{0.id})'.format(target_task))
-            self.graph.add_edge(source_task.id, target_task.id)
-
-    # workflow creation helper methods
-    def chain(self, tasks, after=()):
-        """
-        create a chain of tasks.
-        tasks will be added to the graph with a dependency between
-        the tasks by order.
-
-        tasks flow order:
-        if tasks = (task0, task1, ..., taskn)
-        after -> task0 -> task1 -> ... -> taskn
-
-        :param tasks: list of WorkflowTask instances.
-        :param after: target to the sequence
-        """
-        for source_task in tasks:
-            self.add_task(source_task)
-            self.dependency(source_task, after=after)
-            after = (source_task,)
-
-    def fan_out(self, tasks, after=()):
-        """
-        create a fan-out.
-        tasks will be added to the graph with a dependency to
-        the target task.
-
-        tasks flow order:
-        if tasks = (task0, task1, ..., taskn)
-        after      -> task0
-                   |-> task1
-                   |...
-                   \-> taskn
-
-        :param tasks: list of WorkflowTask instances.
-        :param after: target to the tasks
-        """
-        for source_task in tasks:
-            self.add_task(source_task)
-            self.dependency(source_task, after=after)
-
-    def fan_in(self, source_task, after=None):
-        """
-        create a fan-in.
-        source task will be added to the graph with a dependency to
-        the tasks.
-
-        tasks flow order:
-        if after = (task0, task1, ..., taskn)
-        task0\
-        task1|-> source_task
-        ...  |
-        taskn/
-
-        :param source_task: source to the tasks
-        :param after: list of WorkflowTask instances.
-        """
-        self.add_task(source_task)
-        self.dependency(source_task, after=after)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/api/__init__.py
----------------------------------------------------------------------
diff --git a/tests/workflows/api/__init__.py b/tests/workflows/api/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/api/test_task_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/api/test_task_graph.py 
b/tests/workflows/api/test_task_graph.py
new file mode 100644
index 0000000..ba8862c
--- /dev/null
+++ b/tests/workflows/api/test_task_graph.py
@@ -0,0 +1,658 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from uuid import uuid4
+
+import pytest
+
+from aria.workflows.api.task_graph import TaskGraph, TaskNotInGraphError
+
+
+class MockTask(object):
+    def __init__(self):
+        self.id = str(uuid4())
+
+
+@pytest.fixture
+def graph():
+    return TaskGraph(name='mock-graph')
+
+
+class TestTaskGraphTasks(object):
+
+    def test_add_task(self, graph):
+        task = MockTask()
+        add_result = graph.add_task(task)
+        assert add_result is True
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_add_existing_task(self, graph):
+        task = MockTask()
+        graph.add_task(task)
+        # adding a task already in graph - should have no effect, and return 
False
+        add_result = graph.add_task(task)
+        assert add_result is False
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_remove_task(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_task(task)
+        graph.add_task(other_task)
+        graph.remove_task(other_task)
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_remove_task_with_dependency(self, graph):
+        task = MockTask()
+        dependent_task = MockTask()
+        graph.add_task(task)
+        graph.add_task(dependent_task)
+        graph.add_dependency(dependent_task, task)
+        remove_result = graph.remove_task(dependent_task)
+        assert remove_result is True
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+        # asserting no dependencies are left for the dependent task
+        assert len(list(graph.get_task_dependencies(task))) == 0
+
+    def test_remove_nonexistent_task(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_task(task)
+        # removing a task not in graph - should have no effect, and return 
False
+        remove_result = graph.remove_task(task_not_in_graph)
+        assert remove_result is False
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_has_task(self, graph):
+        task = MockTask()
+        graph.add_task(task)
+        assert graph.has_task(task) is True
+
+    def test_has_nonexistent_task(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_task(task)
+        assert graph.has_task(task_not_in_graph) is False
+
+    def test_get_task(self, graph):
+        task = MockTask()
+        graph.add_task(task)
+        assert graph.get_task(task.id) == task
+
+    def test_get_nonexistent_task(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_task(task)
+        with pytest.raises(TaskNotInGraphError):
+            graph.get_task(task_not_in_graph.id)
+
+
+class TestTaskGraphGraphTraversal(object):
+
+    def test_tasks_iteration(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_task(task)
+        graph.add_task(other_task)
+        tasks = [t for t in graph.tasks]
+        assert set(tasks) == {task, other_task}
+
+    def test_get_task_dependents(self, graph):
+        task = MockTask()
+        dependent_task_1 = MockTask()
+        dependent_task_2 = MockTask()
+        transitively_dependent_task = MockTask()
+
+        graph.add_task(task)
+        graph.add_task(dependent_task_1)
+        graph.add_task(dependent_task_2)
+        graph.add_task(transitively_dependent_task)
+
+        graph.add_dependency(dependent_task_1, task)
+        graph.add_dependency(dependent_task_2, task)
+        graph.add_dependency(transitively_dependent_task, dependent_task_2)
+
+        dependent_tasks = list(graph.get_task_dependents(task))
+        # transitively_dependent_task not expected to appear in the result
+        assert set(dependent_tasks) == {dependent_task_1, dependent_task_2}
+
+    def test_get_task_empty_dependents(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_task(task)
+        graph.add_task(other_task)
+        dependent_tasks = list(graph.get_task_dependents(task))
+        assert len(dependent_tasks) == 0
+
+    def test_get_nonexistent_task_dependents(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_task(task)
+        with pytest.raises(TaskNotInGraphError):
+            list(graph.get_task_dependents(task_not_in_graph))
+
+    def test_get_task_dependencies(self, graph):
+        task = MockTask()
+        dependency_task_1 = MockTask()
+        dependency_task_2 = MockTask()
+        transitively_dependency_task = MockTask()
+
+        graph.add_task(task)
+        graph.add_task(dependency_task_1)
+        graph.add_task(dependency_task_2)
+        graph.add_task(transitively_dependency_task)
+
+        graph.add_dependency(task, dependency_task_1)
+        graph.add_dependency(task, dependency_task_2)
+        graph.add_dependency(dependency_task_2, transitively_dependency_task)
+
+        dependency_tasks = list(graph.get_task_dependencies(task))
+        # transitively_dependency_task not expected to appear in the result
+        assert set(dependency_tasks) == {dependency_task_1, dependency_task_2}
+
+    def test_get_task_empty_dependencies(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_task(task)
+        graph.add_task(other_task)
+        dependency_tasks = list(graph.get_task_dependencies(task))
+        assert len(dependency_tasks) == 0
+
+    def test_get_nonexistent_task_dependencies(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_task(task)
+        with pytest.raises(TaskNotInGraphError):
+            list(graph.get_task_dependencies(task_not_in_graph))
+
+
+class TestTaskGraphDependencies(object):
+
+    def test_add_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        unrelated_task = MockTask()
+        graph.add_task(task)
+        graph.add_task(dependency_task)
+        graph.add_task(unrelated_task)
+        add_result = graph.add_dependency(task, dependency_task)
+        assert add_result is True
+        dependency_tasks = list(graph.get_task_dependencies(task))
+        assert len(dependency_tasks) == 1
+        assert dependency_tasks[0] == dependency_task
+
+    def test_add_existing_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        graph.add_task(task)
+        graph.add_task(dependency_task)
+        graph.add_dependency(task, dependency_task)
+        # adding a dependency already in graph - should have no effect, and 
return False
+        add_result = graph.add_dependency(task, dependency_task)
+        assert add_result is False
+        dependency_tasks = list(graph.get_task_dependencies(task))
+        assert len(dependency_tasks) == 1
+        assert dependency_tasks[0] == dependency_task
+
+    def test_add_dependency_nonexistent_dependent(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_task(task)
+        with pytest.raises(TaskNotInGraphError):
+            graph.add_dependency(task_not_in_graph, task)
+
+    def test_add_dependency_nonexistent_dependency(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_task(task)
+        with pytest.raises(TaskNotInGraphError):
+            graph.add_dependency(task, task_not_in_graph)
+
+    def test_has_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        graph.add_task(task)
+        graph.add_task(dependency_task)
+        graph.add_dependency(task, dependency_task)
+        assert graph.has_dependency(task, dependency_task) is True
+
+    def test_has_nonexistent_dependency(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_task(task)
+        graph.add_task(other_task)
+        assert graph.has_dependency(task, other_task) is False
+
+    def test_has_dependency_nonexistent_dependent(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_task(task)
+        with pytest.raises(TaskNotInGraphError):
+            graph.has_dependency(task_not_in_graph, task)
+
+    def test_has_dependency_nonexistent_dependency(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_task(task)
+        with pytest.raises(TaskNotInGraphError):
+            graph.has_dependency(task, task_not_in_graph)
+
+    def test_remove_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        another_dependent_task = MockTask()
+        graph.add_task(task)
+        graph.add_task(dependency_task)
+        graph.add_task(another_dependent_task)
+        graph.add_dependency(task, dependency_task)
+        graph.add_dependency(another_dependent_task, dependency_task)
+
+        remove_result = graph.remove_dependency(task, dependency_task)
+        assert remove_result is True
+        assert graph.has_dependency(task, dependency_task) is False
+        assert graph.has_dependency(another_dependent_task, dependency_task) 
is True
+
+    def test_remove_nonexistent_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        graph.add_task(task)
+        graph.add_task(dependency_task)
+        # removing a dependency not in graph - should have no effect, and 
return False
+        remove_result = graph.remove_dependency(task, dependency_task)
+        assert remove_result is False
+        tasks = [t for t in graph.tasks]
+        assert set(tasks) == {task, dependency_task}
+
+    def test_remove_dependency_nonexistent_dependent(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_task(task)
+        with pytest.raises(TaskNotInGraphError):
+            graph.remove_dependency(task_not_in_graph, task)
+
+    def test_remove_dependency_nonexistent_dependency(self, graph):
+        # in this test the dependency *task* is not in the graph, not just the 
dependency itself
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_task(task)
+        with pytest.raises(TaskNotInGraphError):
+            graph.remove_dependency(task, task_not_in_graph)
+
+
+class TestTaskGraphArrangements(object):
+
+    def test_sequence(self, graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+        graph.sequence(*tasks)
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        assert len(list(graph.get_task_dependencies(tasks[0]))) == 0
+        assert set(graph.get_task_dependencies(tasks[1])) == {tasks[0]}
+        assert set(graph.get_task_dependencies(tasks[2])) == {tasks[1]}
+
+    def test_sequence_with_some_tasks_and_dependencies_already_in_graph(self, 
graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+        # insert some tasks and dependencies to the graph
+        graph.add_task(tasks[1])
+        graph.add_task(tasks[2])
+        graph.add_dependency(tasks[2], tasks[1])
+
+        graph.sequence(*tasks)
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        assert len(list(graph.get_task_dependencies(tasks[0]))) == 0
+        assert set(graph.get_task_dependencies(tasks[1])) == {tasks[0]}
+        assert set(graph.get_task_dependencies(tasks[2])) == {tasks[1]}
+
+    def test_sequence_with_nested_sequence(self, graph):
+        tasks = [MockTask() for _ in xrange(5)]
+        graph.sequence(tasks[0], graph.sequence(tasks[1], tasks[2], tasks[3]), 
tasks[4])
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        # first task should have no dependencies
+        assert len(list(graph.get_task_dependencies(tasks[0]))) == 0
+        # rest of the tasks should have a single dependency - each depends on 
the one before it
+        for i in xrange(1, len(tasks)):
+            assert set(graph.get_task_dependencies(tasks[i])) == {tasks[i-1]}
+
+    def test_sequence_with_nested_parallel(self, graph):
+        tasks = [MockTask() for _ in xrange(5)]
+        graph.sequence(tasks[0], graph.parallel(tasks[1], tasks[2], tasks[3]), 
tasks[4])
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        # first task should have no dependencies
+        assert len(list(graph.get_task_dependencies(tasks[0]))) == 0
+        # rest of the tasks (except last) should have a single dependency - 
the first task
+        for i in xrange(1, 4):
+            assert set(graph.get_task_dependencies(tasks[i])) == {tasks[0]}
+        # last task should have have a dependency on all tasks except for the 
first one
+        assert set(graph.get_task_dependencies(tasks[4])) == {tasks[1], 
tasks[2], tasks[3]}
+
+    def test_parallel(self, graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+        graph.parallel(*tasks)
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        for i in xrange(len(tasks)):
+            assert len(list(graph.get_task_dependencies(tasks[i]))) == 0
+
+    def test_parallel_with_some_tasks_and_dependencies_already_in_graph(self, 
graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+
+        # insert some tasks and dependencies to the graph
+        graph.add_task(tasks[1])
+        graph.add_task(tasks[2])
+
+        graph.parallel(*tasks)
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        for i in xrange(len(tasks)):
+            assert len(list(graph.get_task_dependencies(tasks[i]))) == 0
+
+    def test_parallel_with_nested_sequence(self, graph):
+        tasks = [MockTask() for _ in xrange(5)]
+        graph.parallel(tasks[0], graph.sequence(tasks[1], tasks[2], tasks[3]), 
tasks[4])
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+
+        # tasks[2] and tasks[3] should each have a single dependency; the rest 
should have none
+        assert len(list(graph.get_task_dependencies(tasks[0]))) == 0
+        assert len(list(graph.get_task_dependencies(tasks[1]))) == 0
+        assert set(graph.get_task_dependencies(tasks[2])) == {tasks[1]}
+        assert set(graph.get_task_dependencies(tasks[3])) == {tasks[2]}
+        assert len(list(graph.get_task_dependencies(tasks[4]))) == 0
+
+    def test_parallel_with_nested_parallel(self, graph):
+        tasks = [MockTask() for _ in xrange(5)]
+        graph.parallel(tasks[0], graph.parallel(tasks[1], tasks[2], tasks[3]), 
tasks[4])
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        # none of the tasks should have any dependencies
+        for i in xrange(len(tasks)):
+            assert len(list(graph.get_task_dependencies(tasks[i]))) == 0
+
+    def test_sequence_and_parallel_deep_nesting(self, graph):
+        # including connecting seq to parallel
+        pass
+
+
+class TestTaskGraphDependenciesWithArrangements(object):
+
+    def test_add_dependency_dependent_sequence(self, graph):
+        task = MockTask()
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        sequence = graph.sequence(*sequence_tasks)
+        graph.add_dependency(sequence, task)
+        assert graph.has_dependency(sequence_tasks[0], task) is True
+        assert graph.has_dependency(sequence_tasks[1], task) is False
+        assert graph.has_dependency(sequence_tasks[2], task) is False
+
+    def test_add_dependency_dependency_sequence(self, graph):
+        task = MockTask()
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        sequence = graph.sequence(*sequence_tasks)
+        graph.add_dependency(task, sequence)
+        assert graph.has_dependency(task, sequence_tasks[0]) is False
+        assert graph.has_dependency(task, sequence_tasks[1]) is False
+        assert graph.has_dependency(task, sequence_tasks[2]) is True
+
+    def test_add_dependency_dependent_parallel(self, graph):
+        task = MockTask()
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        parallel = graph.parallel(*parallel_tasks)
+        graph.add_dependency(parallel, task)
+        assert graph.has_dependency(parallel_tasks[0], task) is True
+        assert graph.has_dependency(parallel_tasks[1], task) is True
+        assert graph.has_dependency(parallel_tasks[2], task) is True
+
+    def test_add_dependency_dependency_parallel(self, graph):
+        task = MockTask()
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        parallel = graph.parallel(*parallel_tasks)
+        graph.add_dependency(task, parallel)
+        assert graph.has_dependency(task, parallel_tasks[0]) is True
+        assert graph.has_dependency(task, parallel_tasks[1]) is True
+        assert graph.has_dependency(task, parallel_tasks[2]) is True
+
+    def test_add_dependency_between_sequence_and_parallel(self, graph):
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        sequence = graph.sequence(*sequence_tasks)
+        parallel = graph.parallel(*parallel_tasks)
+        graph.add_dependency(sequence, parallel)
+        for parallel_task in parallel_tasks:
+            assert graph.has_dependency(sequence_tasks[0], parallel_task) is 
True
+            assert graph.has_dependency(sequence_tasks[1], parallel_task) is 
False
+            assert graph.has_dependency(sequence_tasks[2], parallel_task) is 
False
+
+    def test_add_dependency_between_parallel_and_sequence(self, graph):
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        sequence = graph.sequence(*sequence_tasks)
+        parallel = graph.parallel(*parallel_tasks)
+        graph.add_dependency(parallel, sequence)
+        for parallel_task in parallel_tasks:
+            assert graph.has_dependency(parallel_task, sequence_tasks[0]) is 
False
+            assert graph.has_dependency(parallel_task, sequence_tasks[1]) is 
False
+            assert graph.has_dependency(parallel_task, sequence_tasks[2]) is 
True
+
+    def 
test_add_dependency_dependency_parallel_with_some_existing_dependencies(self, 
graph):
+        task = MockTask()
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        parallel = graph.parallel(*parallel_tasks)
+        # adding a dependency on a specific task manually,
+        # before adding a dependency on the whole parallel
+        graph.add_dependency(task, parallel_tasks[1])
+        graph.add_dependency(task, parallel)
+        assert graph.has_dependency(task, parallel_tasks[0]) is True
+        assert graph.has_dependency(task, parallel_tasks[1]) is True
+        assert graph.has_dependency(task, parallel_tasks[2]) is True
+
+    def test_add_existing_dependency_between_sequence_and_parallel(self, 
graph):
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        sequence = graph.sequence(*sequence_tasks)
+        parallel = graph.parallel(*parallel_tasks)
+        add_result = graph.add_dependency(sequence, parallel)
+        assert add_result is True
+        # adding a dependency already in graph - should have no effect, and 
return False
+        add_result = graph.add_dependency(sequence, parallel)
+        assert add_result is False
+        for parallel_task in parallel_tasks:
+            assert graph.has_dependency(sequence_tasks[0], parallel_task) is 
True
+            assert graph.has_dependency(sequence_tasks[1], parallel_task) is 
False
+            assert graph.has_dependency(sequence_tasks[2], parallel_task) is 
False
+
+    def test_has_dependency_dependent_sequence(self, graph):
+        task = MockTask()
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        sequence = graph.sequence(*sequence_tasks)
+        assert graph.has_dependency(sequence, task) is False
+        graph.add_dependency(sequence, task)
+        assert graph.has_dependency(sequence, task) is True
+
+    def test_has_dependency_dependency_sequence(self, graph):
+        task = MockTask()
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        sequence = graph.sequence(*sequence_tasks)
+        assert graph.has_dependency(task, sequence) is False
+        graph.add_dependency(task, sequence)
+        assert graph.has_dependency(task, sequence) is True
+
+    def test_has_dependency_dependent_parallel(self, graph):
+        task = MockTask()
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        parallel = graph.parallel(*parallel_tasks)
+        assert graph.has_dependency(parallel, task) is False
+        graph.add_dependency(parallel, task)
+        assert graph.has_dependency(parallel, task) is True
+
+    def test_has_dependency_dependency_parallel(self, graph):
+        task = MockTask()
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        parallel = graph.parallel(*parallel_tasks)
+        assert graph.has_dependency(task, parallel) is False
+        graph.add_dependency(task, parallel)
+        assert graph.has_dependency(task, parallel) is True
+
+    def test_has_dependency_between_sequence_and_parallel(self, graph):
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        sequence = graph.sequence(*sequence_tasks)
+        parallel = graph.parallel(*parallel_tasks)
+        assert graph.has_dependency(sequence, parallel) is False
+        graph.add_dependency(sequence, parallel)
+        assert graph.has_dependency(sequence, parallel) is True
+
+    def test_has_dependency_between_parallel_and_sequence(self, graph):
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        sequence = graph.sequence(*sequence_tasks)
+        parallel = graph.parallel(*parallel_tasks)
+        assert graph.has_dependency(parallel, sequence) is False
+        graph.add_dependency(parallel, sequence)
+        assert graph.has_dependency(parallel, sequence) is True
+
+    def 
test_has_dependency_dependency_parallel_with_some_existing_dependencies(self, 
graph):
+        task = MockTask()
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        parallel = graph.parallel(*parallel_tasks)
+        graph.add_dependency(task, parallel_tasks[1])
+        # only a partial dependency exists - has_dependency is expected to 
return False
+        assert graph.has_dependency(task, parallel) is False
+
+    def test_has_nonexistent_dependency_between_sequence_and_parallel(self, 
graph):
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        sequence = graph.sequence(*sequence_tasks)
+        parallel = graph.parallel(*parallel_tasks)
+        assert graph.has_dependency(sequence, parallel) is False
+
+    def test_remove_dependency_dependent_sequence(self, graph):
+        task = MockTask()
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        sequence = graph.sequence(*sequence_tasks)
+        graph.add_dependency(sequence, task)
+        remove_result = graph.remove_dependency(sequence, task)
+        assert remove_result is True
+        assert graph.has_dependency(sequence_tasks[0], task) is False
+        assert graph.has_dependency(sequence_tasks[1], task) is False
+        assert graph.has_dependency(sequence_tasks[2], task) is False
+
+    def test_remove_dependency_dependency_sequence(self, graph):
+        task = MockTask()
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        sequence = graph.sequence(*sequence_tasks)
+        graph.add_dependency(task, sequence)
+        remove_result = graph.remove_dependency(task, sequence)
+        assert remove_result is True
+        assert graph.has_dependency(task, sequence_tasks[0]) is False
+        assert graph.has_dependency(task, sequence_tasks[1]) is False
+        assert graph.has_dependency(task, sequence_tasks[2]) is False
+
+    def test_remove_dependency_dependent_parallel(self, graph):
+        task = MockTask()
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        parallel = graph.parallel(*parallel_tasks)
+        graph.add_dependency(parallel, task)
+        remove_result = graph.remove_dependency(parallel, task)
+        assert remove_result is True
+        assert graph.has_dependency(parallel_tasks[0], task) is False
+        assert graph.has_dependency(parallel_tasks[1], task) is False
+        assert graph.has_dependency(parallel_tasks[2], task) is False
+
+    def test_remove_dependency_dependency_parallel(self, graph):
+        task = MockTask()
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        parallel = graph.parallel(*parallel_tasks)
+        graph.add_dependency(task, parallel)
+        remove_result = graph.remove_dependency(task, parallel)
+        assert remove_result is True
+        assert graph.has_dependency(task, parallel_tasks[0]) is False
+        assert graph.has_dependency(task, parallel_tasks[1]) is False
+        assert graph.has_dependency(task, parallel_tasks[2]) is False
+
+    def test_remove_dependency_between_sequence_and_parallel(self, graph):
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        sequence = graph.sequence(*sequence_tasks)
+        parallel = graph.parallel(*parallel_tasks)
+        graph.add_dependency(sequence, parallel)
+        remove_result = graph.remove_dependency(sequence, parallel)
+        assert remove_result is True
+        for parallel_task in parallel_tasks:
+            assert graph.has_dependency(sequence_tasks[0], parallel_task) is 
False
+            assert graph.has_dependency(sequence_tasks[1], parallel_task) is 
False
+            assert graph.has_dependency(sequence_tasks[2], parallel_task) is 
False
+
+    def test_remove_dependency_between_parallel_and_sequence(self, graph):
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        sequence = graph.sequence(*sequence_tasks)
+        parallel = graph.parallel(*parallel_tasks)
+        graph.add_dependency(parallel, sequence)
+        remove_result = graph.remove_dependency(parallel, sequence)
+        assert remove_result is True
+        for parallel_task in parallel_tasks:
+            assert graph.has_dependency(parallel_task, sequence_tasks[0]) is 
False
+            assert graph.has_dependency(parallel_task, sequence_tasks[1]) is 
False
+            assert graph.has_dependency(parallel_task, sequence_tasks[2]) is 
False
+
+    def 
test_remove_dependency_dependency_parallel_with_some_existing_dependencies(self,
 graph):
+        task = MockTask()
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_task(task)
+        parallel = graph.parallel(*parallel_tasks)
+        graph.add_dependency(task, parallel_tasks[1])
+        remove_result = graph.remove_dependency(task, parallel)
+        # only a partial dependency exists - remove_dependency is expected to 
return False
+        assert remove_result is False
+        # no dependencies are expected to have changed
+        assert graph.has_dependency(task, parallel_tasks[0]) is False
+        assert graph.has_dependency(task, parallel_tasks[1]) is True
+        assert graph.has_dependency(task, parallel_tasks[2]) is False
+
+    def test_remove_nonexistent_dependency_between_sequence_and_parallel(self, 
graph):
+        sequence_tasks = [MockTask() for _ in xrange(3)]
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        sequence = graph.sequence(*sequence_tasks)
+        parallel = graph.parallel(*parallel_tasks)
+        # removing a dependency not in graph - should have no effect, and 
return False
+        remove_result = graph.remove_dependency(sequence, parallel)
+        assert remove_result is False

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/core/__init__.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/__init__.py b/tests/workflows/core/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/core/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_executor.py 
b/tests/workflows/core/test_executor.py
new file mode 100644
index 0000000..27cb2ad
--- /dev/null
+++ b/tests/workflows/core/test_executor.py
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import uuid
+
+import pytest
+import retrying
+
+from aria import events
+from aria.storage import models
+from aria.workflows.core import executor
+
+
+class TestExecutor(object):
+
+    @pytest.mark.parametrize('pool_size,executor_cls', [
+        (1, executor.ThreadExecutor),
+        (2, executor.ThreadExecutor),
+        (1, executor.MultiprocessExecutor),
+        (2, executor.MultiprocessExecutor),
+        (0, executor.CurrentThreadBlockingExecutor)
+    ])
+    def test_execute(self, pool_size, executor_cls):
+        self.executor = executor_cls(pool_size)
+        expected_value = 'value'
+        successful_task = MockTask(mock_successful_task)
+        failing_task = MockTask(mock_failing_task)
+        task_with_inputs = MockTask(mock_task_with_input, inputs={'input': 
expected_value})
+
+        for task in [successful_task, failing_task, task_with_inputs]:
+            self.executor.execute(task)
+
+        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+        def assertion():
+            assert successful_task.states == ['start', 'success']
+            assert failing_task.states == ['start', 'failure']
+            assert task_with_inputs.states == ['start', 'failure']
+            assert isinstance(failing_task.exception, TestException)
+            assert isinstance(task_with_inputs.exception, TestException)
+            assert task_with_inputs.exception.message == expected_value
+        assertion()
+
+    def setup_method(self):
+        events.start_task_signal.connect(start_handler)
+        events.on_success_task_signal.connect(success_handler)
+        events.on_failure_task_signal.connect(failure_handler)
+
+    def teardown_method(self):
+        events.start_task_signal.disconnect(start_handler)
+        events.on_success_task_signal.disconnect(success_handler)
+        events.on_failure_task_signal.disconnect(failure_handler)
+        if self.executor:
+            self.executor.close()
+
+
+def mock_successful_task():
+    pass
+
+
+def mock_failing_task():
+    raise TestException
+
+
+def mock_task_with_input(input):
+    raise TestException(input)
+
+
+class TestException(Exception):
+    pass
+
+
+class MockContext(object):
+
+    def __init__(self, operation_details, inputs):
+        self.operation_details = operation_details
+        self.inputs = inputs
+        self.operation = models.Operation(execution_id='')
+
+
+class MockTask(object):
+
+    def __init__(self, func, inputs=None):
+        self.states = []
+        self.exception = None
+        self.id = str(uuid.uuid4())
+        name = func.__name__
+        operation = 'tests.workflows.test_executor.{name}'.format(name=name)
+        self.context = MockContext(operation_details={'operation': operation},
+                                   inputs=inputs or {})
+        self.logger = logging.getLogger()
+        self.name = name
+
+
+def start_handler(task, *args, **kwargs):
+    task.states.append('start')
+
+
+def success_handler(task, *args, **kwargs):
+    task.states.append('success')
+
+
+def failure_handler(task, exception, *args, **kwargs):
+    task.states.append('failure')
+    task.exception = exception

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/core/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py 
b/tests/workflows/core/test_task_graph_into_exececution_graph.py
new file mode 100644
index 0000000..deab4a3
--- /dev/null
+++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+from networkx import topological_sort, DiGraph
+
+from aria import contexts
+from aria.workflows.api import task_graph
+from aria.workflows.core import tasks, translation
+
+
+@pytest.fixture(autouse=True)
+def no_storage(monkeypatch):
+    monkeypatch.setattr(tasks.OperationTask, '_create_operation_in_storage',
+                        value=lambda *args, **kwargs: None)
+
+
+def test_task_graph_into_execution_graph():
+    task_graph = task_graph.TaskGraph('test_task_graph')
+    simple_before_task = contexts.OperationContext('test_simple_before_task', 
{}, {}, None)
+    simple_after_task = contexts.OperationContext('test_simple_after_task', 
{}, {}, None)
+
+    inner_task_graph = task_graph.TaskGraph('test_inner_task_graph')
+    inner_task = contexts.OperationContext('test_inner_task', {}, {}, None)
+    inner_task_graph.add_task(inner_task)
+
+    task_graph.add_task(simple_before_task)
+    task_graph.add_task(simple_after_task)
+    task_graph.add_task(inner_task_graph)
+    task_graph.dependency(inner_task_graph, [simple_before_task])
+    task_graph.dependency(simple_after_task, [inner_task_graph])
+
+    # Direct check
+    execution_graph = DiGraph()
+    translation.build_execution_graph(task_graph=task_graph,
+                                      workflow_context=None,
+                                      execution_graph=execution_graph)
+    execution_tasks = topological_sort(execution_graph)
+
+    assert len(execution_tasks) == 7
+
+    expected_tasks_names = [
+        '{0}-Start'.format(task_graph.id),
+        simple_before_task.id,
+        '{0}-Start'.format(inner_task_graph.id),
+        inner_task.id,
+        '{0}-End'.format(inner_task_graph.id),
+        simple_after_task.id,
+        '{0}-End'.format(task_graph.id)
+    ]
+
+    assert expected_tasks_names == execution_tasks
+
+    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
+                      tasks.StartWorkflowTask)
+    assert simple_before_task == _get_task_by_name(execution_tasks[1], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
+                      tasks.StartSubWorkflowTask)
+    assert inner_task == _get_task_by_name(execution_tasks[3], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), 
tasks.
+                      EndSubWorkflowTask)
+    assert simple_after_task == _get_task_by_name(execution_tasks[5], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), 
tasks.EndWorkflowTask)
+
+
+def _get_task_by_name(task_name, graph):
+    return graph.node[task_name]['task']

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_executor.py b/tests/workflows/test_executor.py
deleted file mode 100644
index 27cb2ad..0000000
--- a/tests/workflows/test_executor.py
+++ /dev/null
@@ -1,117 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import uuid
-
-import pytest
-import retrying
-
-from aria import events
-from aria.storage import models
-from aria.workflows.core import executor
-
-
-class TestExecutor(object):
-
-    @pytest.mark.parametrize('pool_size,executor_cls', [
-        (1, executor.ThreadExecutor),
-        (2, executor.ThreadExecutor),
-        (1, executor.MultiprocessExecutor),
-        (2, executor.MultiprocessExecutor),
-        (0, executor.CurrentThreadBlockingExecutor)
-    ])
-    def test_execute(self, pool_size, executor_cls):
-        self.executor = executor_cls(pool_size)
-        expected_value = 'value'
-        successful_task = MockTask(mock_successful_task)
-        failing_task = MockTask(mock_failing_task)
-        task_with_inputs = MockTask(mock_task_with_input, inputs={'input': 
expected_value})
-
-        for task in [successful_task, failing_task, task_with_inputs]:
-            self.executor.execute(task)
-
-        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
-        def assertion():
-            assert successful_task.states == ['start', 'success']
-            assert failing_task.states == ['start', 'failure']
-            assert task_with_inputs.states == ['start', 'failure']
-            assert isinstance(failing_task.exception, TestException)
-            assert isinstance(task_with_inputs.exception, TestException)
-            assert task_with_inputs.exception.message == expected_value
-        assertion()
-
-    def setup_method(self):
-        events.start_task_signal.connect(start_handler)
-        events.on_success_task_signal.connect(success_handler)
-        events.on_failure_task_signal.connect(failure_handler)
-
-    def teardown_method(self):
-        events.start_task_signal.disconnect(start_handler)
-        events.on_success_task_signal.disconnect(success_handler)
-        events.on_failure_task_signal.disconnect(failure_handler)
-        if self.executor:
-            self.executor.close()
-
-
-def mock_successful_task():
-    pass
-
-
-def mock_failing_task():
-    raise TestException
-
-
-def mock_task_with_input(input):
-    raise TestException(input)
-
-
-class TestException(Exception):
-    pass
-
-
-class MockContext(object):
-
-    def __init__(self, operation_details, inputs):
-        self.operation_details = operation_details
-        self.inputs = inputs
-        self.operation = models.Operation(execution_id='')
-
-
-class MockTask(object):
-
-    def __init__(self, func, inputs=None):
-        self.states = []
-        self.exception = None
-        self.id = str(uuid.uuid4())
-        name = func.__name__
-        operation = 'tests.workflows.test_executor.{name}'.format(name=name)
-        self.context = MockContext(operation_details={'operation': operation},
-                                   inputs=inputs or {})
-        self.logger = logging.getLogger()
-        self.name = name
-
-
-def start_handler(task, *args, **kwargs):
-    task.states.append('start')
-
-
-def success_handler(task, *args, **kwargs):
-    task.states.append('success')
-
-
-def failure_handler(task, exception, *args, **kwargs):
-    task.states.append('failure')
-    task.exception = exception

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_task_graph_into_exececution_graph.py 
b/tests/workflows/test_task_graph_into_exececution_graph.py
deleted file mode 100644
index 1bae713..0000000
--- a/tests/workflows/test_task_graph_into_exececution_graph.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import pytest
-from networkx import topological_sort, DiGraph
-
-from aria import contexts
-from aria.workflows.api import tasks_graph
-from aria.workflows.core import tasks, translation
-
-
-@pytest.fixture(autouse=True)
-def no_storage(monkeypatch):
-    monkeypatch.setattr(tasks.OperationTask, '_create_operation_in_storage',
-                        value=lambda *args, **kwargs: None)
-
-
-def test_task_graph_into_execution_graph():
-    task_graph = tasks_graph.TaskGraph('test_task_graph')
-    simple_before_task = contexts.OperationContext('test_simple_before_task', 
{}, {}, None)
-    simple_after_task = contexts.OperationContext('test_simple_after_task', 
{}, {}, None)
-
-    inner_task_graph = tasks_graph.TaskGraph('test_inner_task_graph')
-    inner_task = contexts.OperationContext('test_inner_task', {}, {}, None)
-    inner_task_graph.add_task(inner_task)
-
-    task_graph.add_task(simple_before_task)
-    task_graph.add_task(simple_after_task)
-    task_graph.add_task(inner_task_graph)
-    task_graph.dependency(inner_task_graph, [simple_before_task])
-    task_graph.dependency(simple_after_task, [inner_task_graph])
-
-    # Direct check
-    execution_graph = DiGraph()
-    translation.build_execution_graph(task_graph=task_graph,
-                                      workflow_context=None,
-                                      execution_graph=execution_graph)
-    execution_tasks = topological_sort(execution_graph)
-
-    assert len(execution_tasks) == 7
-
-    expected_tasks_names = [
-        '{0}-Start'.format(task_graph.id),
-        simple_before_task.id,
-        '{0}-Start'.format(inner_task_graph.id),
-        inner_task.id,
-        '{0}-End'.format(inner_task_graph.id),
-        simple_after_task.id,
-        '{0}-End'.format(task_graph.id)
-    ]
-
-    assert expected_tasks_names == execution_tasks
-
-    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
-                      tasks.StartWorkflowTask)
-    assert simple_before_task == _get_task_by_name(execution_tasks[1], 
execution_graph).context
-    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
-                      tasks.StartSubWorkflowTask)
-    assert inner_task == _get_task_by_name(execution_tasks[3], 
execution_graph).context
-    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), 
tasks.
-                      EndSubWorkflowTask)
-    assert simple_after_task == _get_task_by_name(execution_tasks[5], 
execution_graph).context
-    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), 
tasks.EndWorkflowTask)
-
-
-def _get_task_by_name(task_name, graph):
-    return graph.node[task_name]['task']

Reply via email to