ARIA-4 Create an API for creating workflows An API for creating workflows. Users can build graphs of tasks and set depenedencies in between tasks to execute them in a specific order.
This commit also includes minimal reorganization of a few test modules, so they are now using the same file system hierarchy as the modules which they test. Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/107c9729 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/107c9729 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/107c9729 Branch: refs/heads/ARIA-3-api-for-creating-workflows Commit: 107c972981dbd2ce0bbc3ea55e2801049031a581 Parents: e1c919d Author: Ran Ziv <r...@gigaspaces.com> Authored: Fri Oct 21 13:21:25 2016 +0300 Committer: Ran Ziv <r...@gigaspaces.com> Committed: Sun Oct 30 08:51:19 2016 +0200 ---------------------------------------------------------------------- aria/contexts.py | 2 +- aria/workflows/api/task_graph.py | 305 +++++++++ aria/workflows/api/tasks_graph.py | 203 ------ tests/workflows/api/__init__.py | 0 tests/workflows/api/test_task_graph.py | 658 +++++++++++++++++++ tests/workflows/core/__init__.py | 0 tests/workflows/core/test_executor.py | 117 ++++ .../test_task_graph_into_exececution_graph.py | 79 +++ tests/workflows/test_executor.py | 117 ---- .../test_task_graph_into_exececution_graph.py | 79 --- 10 files changed, 1160 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/aria/contexts.py ---------------------------------------------------------------------- diff --git a/aria/contexts.py b/aria/contexts.py index ae7fc66..3bda9be 100644 --- a/aria/contexts.py +++ b/aria/contexts.py @@ -21,7 +21,7 @@ from uuid import uuid4 from aria.logger import LoggerMixin from aria.tools.lru_cache import lru_cache -from aria.workflows.api.tasks_graph import TaskGraph +from aria.workflows.api.task_graph import TaskGraph class WorkflowContext(LoggerMixin): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/aria/workflows/api/task_graph.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/task_graph.py b/aria/workflows/api/task_graph.py new file mode 100644 index 0000000..fdc4e2b --- /dev/null +++ b/aria/workflows/api/task_graph.py @@ -0,0 +1,305 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Task graph. Used by users to build workflows +""" + +from uuid import uuid4 + +from networkx import DiGraph + + +class TaskNotInGraphError(Exception): + """ + An error representing a scenario where a given task is not in the graph as expected + """ + pass + + +class TaskGraph(object): + """ + A tasks graph builder. + Build an operations flow graph + """ + + def __init__(self, name): + self.name = name + self._id = str(uuid4()) + self._graph = DiGraph() + + def __repr__(self): + return '{name}(id={self._id}, name={self.name}, graph={self._graph!r})'.format( + name=self.__class__.__name__, self=self) + + # graph traversal methods + + @property + def tasks(self): + """ + An iterator on tasks added to the graph + :yields: Iterator over all tasks in the graph + """ + for _, data in self._graph.nodes_iter(data=True): + yield data['task'] + + def get_task_dependencies(self, dependent_task): + """ + Iterates over the task's dependencies + :param BaseTask dependent_task: The task whose dependencies are requested + :yields: Iterator over all tasks which dependency_task depends on + :raise: TaskNotInGraphError if dependent_task is not in the graph + """ + if not self.has_task(dependent_task): + raise TaskNotInGraphError('Task id: {0}'.format(dependent_task.id)) + for _, dependency_id in self._graph.out_edges_iter(dependent_task.id): + yield self.get_task(dependency_id) + + def get_task_dependents(self, dependency_task): + """ + Iterates over the task's dependents + :param BaseTask dependency_task: The task whose dependents are requested + :yields: Iterator over all tasks which depend on dependency_task + :raise: TaskNotInGraphError if dependency_task is not in the graph + """ + if not self.has_task(dependency_task): + raise TaskNotInGraphError('Task id: {0}'.format(dependency_task.id)) + for dependent_id, _ in self._graph.in_edges_iter(dependency_task.id): + yield self.get_task(dependent_id) + + # task methods + + def get_task(self, task_id): + """ + Get a task instance that's been inserted to the graph by the task's id + :param basestring task_id: The task's id + :return: Requested task + :rtype: BaseTask + :raise: TaskNotInGraphError if no task found in the graph with the given id + """ + if not self._graph.has_node(task_id): + raise TaskNotInGraphError('Task id: {0}'.format(task_id)) + data = self._graph.node[task_id] + return data['task'] + + def add_task(self, task): + """ + Add a task to the graph + :param BaseTask task: The task + :return: False if task was already in the graph, otherwise True + :rtype: bool + """ + if self.has_task(task): + return False + self._graph.add_node(task.id, task=task) + return True + + def remove_task(self, task): + """ + Remove the provided task from the graph + :param BaseTask task: The task + :return: False if task wasn't in the graph, otherwise True + :rtype: bool + """ + if not self.has_task(task): + return False + self._graph.remove_node(task.id) + return True + + def has_task(self, task): + """ + Check whether a task is in the graph or not + :param BaseTask task: The task + :return: True if task is in the graph, otherwise True + :rtype: bool + """ + return self._graph.has_node(task.id) + + def add_dependency(self, dependent, dependency): + """ + Add a dependency for one item (task, sequence or parallel) on another + The dependent will only be executed after the dependency terminates + If either of the items is either a sequence or a parallel, + multiple dependencies may be added + :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) + :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) + :return: True if the dependency between the two hadn't already existed, otherwise False + :rtype: bool + :raise TaskNotInGraphError if either the dependent or dependency are tasks which + are not in the graph + """ + if self.has_dependency(dependent, dependency): + return False + + dependent_tasks, dependency_tasks = \ + self._extract_dependent_and_dependency_tasks(dependent, dependency) + for dependent_task in dependent_tasks: + for dependency_task in dependency_tasks: + self._graph.add_edge(dependent_task.id, dependency_task.id) + return True + + def has_dependency(self, dependent, dependency): + """ + Check whether one item (task, sequence or parallel) depends on another + + Note that if either of the items is either a sequence or a parallel, + and some of the dependencies exist in the graph but not all of them, + this method will return False + + :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) + :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) + :return: True if the dependency between the two exists, otherwise False + :rtype: bool + :raise TaskNotInGraphError if either the dependent or dependency are tasks + which are not in the graph + """ + dependent_tasks, dependency_tasks = \ + self._extract_dependent_and_dependency_tasks(dependent, dependency) + for dependent_task in dependent_tasks: + for dependency_task in dependency_tasks: + if not self._graph.has_edge(dependent_task.id, dependency_task.id): + return False + return True + + def remove_dependency(self, dependent, dependency): + """ + Remove a dependency for one item (task, sequence or parallel) on another + + Note that if either of the items is either a sequence or a parallel, and some of + the dependencies exist in the graph but not all of them, this method will not remove + any of the dependencies and return False + + :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) + :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) + :return: False if the dependency between the two hadn't existed, otherwise True + :rtype: bool + :raise TaskNotInGraphError if either the dependent or dependency are tasks + which are not in the graph + """ + if not self.has_dependency(dependent, dependency): + return False + + dependent_tasks, dependency_tasks = \ + self._extract_dependent_and_dependency_tasks(dependent, dependency) + for dependent_task in dependent_tasks: + for dependency_task in dependency_tasks: + self._graph.remove_edge(dependent_task.id, dependency_task.id) + return True + + def sequence(self, item, *items): + """ + A graph-building helper, useful for chaining items in a sequence - + it'll add a dependency for each item on the one that came before it + + an item may be either a task, a sequence or a parallel + this method will also add to the graph each of the items not already present in it + + :param BaseTask|_TasksArrangement item: The first item (task, sequence or parallel) + :param BaseTask|_TasksArrangement *items: The rest of the items (task, sequence or parallel) + :return: The sequence object + :rtype: _TasksSequence + """ + all_items = [item] + list(items) + prev_item = None + + for an_item in all_items: + if not isinstance(an_item, TaskGraph._TasksArrangement) and not self.has_task(an_item): + self.add_task(an_item) + if prev_item is not None: + self.add_dependency(an_item, prev_item) + prev_item = an_item + + return self._TasksSequence(all_items) + + def parallel(self, item, *items): + """ + A graph-building helper, useful for allowing for several items to execute in parallel + this method won't actually add any dependency between the items passed to it - + but the returned object may be used to add dependencies for and on this group of items + + an item may be either a task, a sequence or a parallel + this method will also add to the graph each of the items not already present in it + + :param BaseTask|_TasksArrangement item: One item (task, sequence or parallel) + :param BaseTask|_TasksArrangement *items: The rest of the items (task, sequence or parallel) + :return: The parallel object + :rtype: _TasksParallel + """ + all_items = [item] + list(items) + + for an_item in all_items: + if not isinstance(an_item, TaskGraph._TasksArrangement) and not self.has_task(an_item): + self.add_task(an_item) + + return self._TasksParallel(all_items) + + def _extract_dependent_and_dependency_tasks(self, dependent, dependency): + if isinstance(dependent, TaskGraph._TasksArrangement): + dependent_tasks = dependent.get_first_tasks() + else: + if not self._graph.has_node(dependent.id): + raise TaskNotInGraphError( + 'dependent task {0!r} is not in graph (task id: {0.id})'.format(dependent)) + dependent_tasks = [dependent] + + if isinstance(dependency, TaskGraph._TasksArrangement): + dependency_tasks = dependency.get_last_tasks() + else: + if not self._graph.has_node(dependency.id): + raise TaskNotInGraphError( + 'dependency task {0!r} is not in graph (task id: {0.id})'.format(dependency)) + dependency_tasks = [dependency] + + return dependent_tasks, dependency_tasks + + class _TasksArrangement(object): + def __init__(self, tasks): + self._tasks = tasks + + def get_first_tasks(self): + """ + The first tasks to be executed in the arrangement + :rtype list + :return: list of the first tasks to be executed in the arrangement + """ + raise NotImplementedError() + + def get_last_tasks(self): + """ + The last tasks to be executed in the arrangement + :rtype list + :return: list of the last tasks to be executed in the arrangement + """ + raise NotImplementedError() + + class _TasksSequence(_TasksArrangement): + def __init__(self, tasks): + super(TaskGraph._TasksSequence, self).__init__(tasks) + + def get_first_tasks(self): + return [self._tasks[0]] + + def get_last_tasks(self): + return [self._tasks[-1]] + + class _TasksParallel(_TasksArrangement): + def __init__(self, tasks): + super(TaskGraph._TasksParallel, self).__init__(tasks) + + def get_first_tasks(self): + return self._tasks + + def get_last_tasks(self): + return self._tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/aria/workflows/api/tasks_graph.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/tasks_graph.py b/aria/workflows/api/tasks_graph.py deleted file mode 100644 index 5160345..0000000 --- a/aria/workflows/api/tasks_graph.py +++ /dev/null @@ -1,203 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from uuid import uuid4 - -from networkx import DiGraph, topological_sort - -from aria.tools.validation import ValidatorMixin - - -class TaskNotFoundError(Exception): - pass - - -class TaskNotInGraphError(Exception): - pass - - -class TaskGraph(ValidatorMixin): - """ - A task graph builder. - Build a operations flow graph - """ - - def __init__(self, name): - self.name = name - self.id = str(uuid4()) - self.graph = DiGraph() - - def __getattr__(self, attr): - try: - return getattr(self.graph, attr) - except AttributeError: - return super(TaskGraph, self).__getattribute__(attr) - - def __repr__(self): - return '{name}(id={self.id}, name={self.name}, graph={self.graph!r})'.format( - name=self.__class__.__name__, self=self) - - @property - def tasks(self): - """ - An iterator on tasks added to the graph - """ - for _, data in self.graph.nodes_iter(data=True): - yield data['task'] - - @property - def leaf_tasks(self): - for task in self.tasks_in_order(): - if not self.graph.predecessors(task.id): - yield task - - def task_tree(self, reverse=False): - """ - Iterates over the tasks to be executed in topological order and their dependencies. - :param reverse: reverse the order - """ - for task in self.tasks_in_order(reverse=reverse): - yield task, self.task_dependencies(task) - - def tasks_in_order(self, reverse=False): - """ - Iterates over the tasks to be executed in topological order - :param reverse: reverse the order - """ - for task_id in topological_sort(self.graph, reverse=reverse): - yield self.graph.node[task_id]['task'] - - def has_dependencies(self, task): - return len(self.task_dependencies(task)) > 0 - - def task_dependencies(self, task): - """ - Iterates over the task dependencies - """ - for task_ids in self.graph.edges_iter(task.id): - for task_id in task_ids: - if task.id != task_id: - yield self.get_task(task_id) - - def add_task(self, task): - """ - Add a task to this graph - :param WorkflowTask|TaskGraph task: The task - """ - self.graph.add_node(task.id, task=task) - - def get_task(self, task_id): - """ - Get a task instance that was inserted to this graph by its id - - :param basestring task_id: the task id - :return: requested task - :rtype: WorkflowTask|TaskGraph - :raise: TaskNotFoundError if no task found with given id - """ - try: - data = self.graph.node[task_id] - return data['task'] - except KeyError: - raise TaskNotFoundError('Task id: {0}'.format(task_id)) - - def remove_task(self, task): - """ - Remove the provided task from the graph - :param WorkflowTask|graph task: The task - """ - self.graph.remove_node(task.id) - - def dependency(self, source_task, after): - """ - Add a dependency between tasks. - The source task will only be executed after the target task terminates. - A source task may depend on several tasks, - In which case it will only be executed after all its target tasks will terminate. - - tasks flow order: - after -> source_task - - :param WorkflowTask|TaskGraph source_task: The source task - :type source_task: WorkflowTask - :param list after: The target task - :raise TaskNotInGraphError - """ - if not self.graph.has_node(source_task.id): - raise TaskNotInGraphError( - 'source task {0!r} is not in graph (task id: {0.id})'.format(source_task)) - for target_task in after: - if not self.graph.has_node(target_task.id): - raise TaskNotInGraphError( - 'target task {0!r} is not in graph (task id: {0.id})'.format(target_task)) - self.graph.add_edge(source_task.id, target_task.id) - - # workflow creation helper methods - def chain(self, tasks, after=()): - """ - create a chain of tasks. - tasks will be added to the graph with a dependency between - the tasks by order. - - tasks flow order: - if tasks = (task0, task1, ..., taskn) - after -> task0 -> task1 -> ... -> taskn - - :param tasks: list of WorkflowTask instances. - :param after: target to the sequence - """ - for source_task in tasks: - self.add_task(source_task) - self.dependency(source_task, after=after) - after = (source_task,) - - def fan_out(self, tasks, after=()): - """ - create a fan-out. - tasks will be added to the graph with a dependency to - the target task. - - tasks flow order: - if tasks = (task0, task1, ..., taskn) - after -> task0 - |-> task1 - |... - \-> taskn - - :param tasks: list of WorkflowTask instances. - :param after: target to the tasks - """ - for source_task in tasks: - self.add_task(source_task) - self.dependency(source_task, after=after) - - def fan_in(self, source_task, after=None): - """ - create a fan-in. - source task will be added to the graph with a dependency to - the tasks. - - tasks flow order: - if after = (task0, task1, ..., taskn) - task0\ - task1|-> source_task - ... | - taskn/ - - :param source_task: source to the tasks - :param after: list of WorkflowTask instances. - """ - self.add_task(source_task) - self.dependency(source_task, after=after) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/api/__init__.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/__init__.py b/tests/workflows/api/__init__.py new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/api/test_task_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/test_task_graph.py b/tests/workflows/api/test_task_graph.py new file mode 100644 index 0000000..ba8862c --- /dev/null +++ b/tests/workflows/api/test_task_graph.py @@ -0,0 +1,658 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from uuid import uuid4 + +import pytest + +from aria.workflows.api.task_graph import TaskGraph, TaskNotInGraphError + + +class MockTask(object): + def __init__(self): + self.id = str(uuid4()) + + +@pytest.fixture +def graph(): + return TaskGraph(name='mock-graph') + + +class TestTaskGraphTasks(object): + + def test_add_task(self, graph): + task = MockTask() + add_result = graph.add_task(task) + assert add_result is True + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_add_existing_task(self, graph): + task = MockTask() + graph.add_task(task) + # adding a task already in graph - should have no effect, and return False + add_result = graph.add_task(task) + assert add_result is False + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_remove_task(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_task(task) + graph.add_task(other_task) + graph.remove_task(other_task) + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_remove_task_with_dependency(self, graph): + task = MockTask() + dependent_task = MockTask() + graph.add_task(task) + graph.add_task(dependent_task) + graph.add_dependency(dependent_task, task) + remove_result = graph.remove_task(dependent_task) + assert remove_result is True + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + # asserting no dependencies are left for the dependent task + assert len(list(graph.get_task_dependencies(task))) == 0 + + def test_remove_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_task(task) + # removing a task not in graph - should have no effect, and return False + remove_result = graph.remove_task(task_not_in_graph) + assert remove_result is False + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_has_task(self, graph): + task = MockTask() + graph.add_task(task) + assert graph.has_task(task) is True + + def test_has_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_task(task) + assert graph.has_task(task_not_in_graph) is False + + def test_get_task(self, graph): + task = MockTask() + graph.add_task(task) + assert graph.get_task(task.id) == task + + def test_get_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_task(task) + with pytest.raises(TaskNotInGraphError): + graph.get_task(task_not_in_graph.id) + + +class TestTaskGraphGraphTraversal(object): + + def test_tasks_iteration(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_task(task) + graph.add_task(other_task) + tasks = [t for t in graph.tasks] + assert set(tasks) == {task, other_task} + + def test_get_task_dependents(self, graph): + task = MockTask() + dependent_task_1 = MockTask() + dependent_task_2 = MockTask() + transitively_dependent_task = MockTask() + + graph.add_task(task) + graph.add_task(dependent_task_1) + graph.add_task(dependent_task_2) + graph.add_task(transitively_dependent_task) + + graph.add_dependency(dependent_task_1, task) + graph.add_dependency(dependent_task_2, task) + graph.add_dependency(transitively_dependent_task, dependent_task_2) + + dependent_tasks = list(graph.get_task_dependents(task)) + # transitively_dependent_task not expected to appear in the result + assert set(dependent_tasks) == {dependent_task_1, dependent_task_2} + + def test_get_task_empty_dependents(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_task(task) + graph.add_task(other_task) + dependent_tasks = list(graph.get_task_dependents(task)) + assert len(dependent_tasks) == 0 + + def test_get_nonexistent_task_dependents(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_task(task) + with pytest.raises(TaskNotInGraphError): + list(graph.get_task_dependents(task_not_in_graph)) + + def test_get_task_dependencies(self, graph): + task = MockTask() + dependency_task_1 = MockTask() + dependency_task_2 = MockTask() + transitively_dependency_task = MockTask() + + graph.add_task(task) + graph.add_task(dependency_task_1) + graph.add_task(dependency_task_2) + graph.add_task(transitively_dependency_task) + + graph.add_dependency(task, dependency_task_1) + graph.add_dependency(task, dependency_task_2) + graph.add_dependency(dependency_task_2, transitively_dependency_task) + + dependency_tasks = list(graph.get_task_dependencies(task)) + # transitively_dependency_task not expected to appear in the result + assert set(dependency_tasks) == {dependency_task_1, dependency_task_2} + + def test_get_task_empty_dependencies(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_task(task) + graph.add_task(other_task) + dependency_tasks = list(graph.get_task_dependencies(task)) + assert len(dependency_tasks) == 0 + + def test_get_nonexistent_task_dependencies(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_task(task) + with pytest.raises(TaskNotInGraphError): + list(graph.get_task_dependencies(task_not_in_graph)) + + +class TestTaskGraphDependencies(object): + + def test_add_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + unrelated_task = MockTask() + graph.add_task(task) + graph.add_task(dependency_task) + graph.add_task(unrelated_task) + add_result = graph.add_dependency(task, dependency_task) + assert add_result is True + dependency_tasks = list(graph.get_task_dependencies(task)) + assert len(dependency_tasks) == 1 + assert dependency_tasks[0] == dependency_task + + def test_add_existing_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_task(task) + graph.add_task(dependency_task) + graph.add_dependency(task, dependency_task) + # adding a dependency already in graph - should have no effect, and return False + add_result = graph.add_dependency(task, dependency_task) + assert add_result is False + dependency_tasks = list(graph.get_task_dependencies(task)) + assert len(dependency_tasks) == 1 + assert dependency_tasks[0] == dependency_task + + def test_add_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_task(task) + with pytest.raises(TaskNotInGraphError): + graph.add_dependency(task_not_in_graph, task) + + def test_add_dependency_nonexistent_dependency(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_task(task) + with pytest.raises(TaskNotInGraphError): + graph.add_dependency(task, task_not_in_graph) + + def test_has_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_task(task) + graph.add_task(dependency_task) + graph.add_dependency(task, dependency_task) + assert graph.has_dependency(task, dependency_task) is True + + def test_has_nonexistent_dependency(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_task(task) + graph.add_task(other_task) + assert graph.has_dependency(task, other_task) is False + + def test_has_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_task(task) + with pytest.raises(TaskNotInGraphError): + graph.has_dependency(task_not_in_graph, task) + + def test_has_dependency_nonexistent_dependency(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_task(task) + with pytest.raises(TaskNotInGraphError): + graph.has_dependency(task, task_not_in_graph) + + def test_remove_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + another_dependent_task = MockTask() + graph.add_task(task) + graph.add_task(dependency_task) + graph.add_task(another_dependent_task) + graph.add_dependency(task, dependency_task) + graph.add_dependency(another_dependent_task, dependency_task) + + remove_result = graph.remove_dependency(task, dependency_task) + assert remove_result is True + assert graph.has_dependency(task, dependency_task) is False + assert graph.has_dependency(another_dependent_task, dependency_task) is True + + def test_remove_nonexistent_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_task(task) + graph.add_task(dependency_task) + # removing a dependency not in graph - should have no effect, and return False + remove_result = graph.remove_dependency(task, dependency_task) + assert remove_result is False + tasks = [t for t in graph.tasks] + assert set(tasks) == {task, dependency_task} + + def test_remove_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_task(task) + with pytest.raises(TaskNotInGraphError): + graph.remove_dependency(task_not_in_graph, task) + + def test_remove_dependency_nonexistent_dependency(self, graph): + # in this test the dependency *task* is not in the graph, not just the dependency itself + task = MockTask() + task_not_in_graph = MockTask() + graph.add_task(task) + with pytest.raises(TaskNotInGraphError): + graph.remove_dependency(task, task_not_in_graph) + + +class TestTaskGraphArrangements(object): + + def test_sequence(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + graph.sequence(*tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + assert len(list(graph.get_task_dependencies(tasks[0]))) == 0 + assert set(graph.get_task_dependencies(tasks[1])) == {tasks[0]} + assert set(graph.get_task_dependencies(tasks[2])) == {tasks[1]} + + def test_sequence_with_some_tasks_and_dependencies_already_in_graph(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + # insert some tasks and dependencies to the graph + graph.add_task(tasks[1]) + graph.add_task(tasks[2]) + graph.add_dependency(tasks[2], tasks[1]) + + graph.sequence(*tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + assert len(list(graph.get_task_dependencies(tasks[0]))) == 0 + assert set(graph.get_task_dependencies(tasks[1])) == {tasks[0]} + assert set(graph.get_task_dependencies(tasks[2])) == {tasks[1]} + + def test_sequence_with_nested_sequence(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.sequence(tasks[0], graph.sequence(tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # first task should have no dependencies + assert len(list(graph.get_task_dependencies(tasks[0]))) == 0 + # rest of the tasks should have a single dependency - each depends on the one before it + for i in xrange(1, len(tasks)): + assert set(graph.get_task_dependencies(tasks[i])) == {tasks[i-1]} + + def test_sequence_with_nested_parallel(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.sequence(tasks[0], graph.parallel(tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # first task should have no dependencies + assert len(list(graph.get_task_dependencies(tasks[0]))) == 0 + # rest of the tasks (except last) should have a single dependency - the first task + for i in xrange(1, 4): + assert set(graph.get_task_dependencies(tasks[i])) == {tasks[0]} + # last task should have have a dependency on all tasks except for the first one + assert set(graph.get_task_dependencies(tasks[4])) == {tasks[1], tasks[2], tasks[3]} + + def test_parallel(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + graph.parallel(*tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + for i in xrange(len(tasks)): + assert len(list(graph.get_task_dependencies(tasks[i]))) == 0 + + def test_parallel_with_some_tasks_and_dependencies_already_in_graph(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + + # insert some tasks and dependencies to the graph + graph.add_task(tasks[1]) + graph.add_task(tasks[2]) + + graph.parallel(*tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + for i in xrange(len(tasks)): + assert len(list(graph.get_task_dependencies(tasks[i]))) == 0 + + def test_parallel_with_nested_sequence(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.parallel(tasks[0], graph.sequence(tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + + # tasks[2] and tasks[3] should each have a single dependency; the rest should have none + assert len(list(graph.get_task_dependencies(tasks[0]))) == 0 + assert len(list(graph.get_task_dependencies(tasks[1]))) == 0 + assert set(graph.get_task_dependencies(tasks[2])) == {tasks[1]} + assert set(graph.get_task_dependencies(tasks[3])) == {tasks[2]} + assert len(list(graph.get_task_dependencies(tasks[4]))) == 0 + + def test_parallel_with_nested_parallel(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.parallel(tasks[0], graph.parallel(tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # none of the tasks should have any dependencies + for i in xrange(len(tasks)): + assert len(list(graph.get_task_dependencies(tasks[i]))) == 0 + + def test_sequence_and_parallel_deep_nesting(self, graph): + # including connecting seq to parallel + pass + + +class TestTaskGraphDependenciesWithArrangements(object): + + def test_add_dependency_dependent_sequence(self, graph): + task = MockTask() + sequence_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + sequence = graph.sequence(*sequence_tasks) + graph.add_dependency(sequence, task) + assert graph.has_dependency(sequence_tasks[0], task) is True + assert graph.has_dependency(sequence_tasks[1], task) is False + assert graph.has_dependency(sequence_tasks[2], task) is False + + def test_add_dependency_dependency_sequence(self, graph): + task = MockTask() + sequence_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + sequence = graph.sequence(*sequence_tasks) + graph.add_dependency(task, sequence) + assert graph.has_dependency(task, sequence_tasks[0]) is False + assert graph.has_dependency(task, sequence_tasks[1]) is False + assert graph.has_dependency(task, sequence_tasks[2]) is True + + def test_add_dependency_dependent_parallel(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + parallel = graph.parallel(*parallel_tasks) + graph.add_dependency(parallel, task) + assert graph.has_dependency(parallel_tasks[0], task) is True + assert graph.has_dependency(parallel_tasks[1], task) is True + assert graph.has_dependency(parallel_tasks[2], task) is True + + def test_add_dependency_dependency_parallel(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + parallel = graph.parallel(*parallel_tasks) + graph.add_dependency(task, parallel) + assert graph.has_dependency(task, parallel_tasks[0]) is True + assert graph.has_dependency(task, parallel_tasks[1]) is True + assert graph.has_dependency(task, parallel_tasks[2]) is True + + def test_add_dependency_between_sequence_and_parallel(self, graph): + sequence_tasks = [MockTask() for _ in xrange(3)] + parallel_tasks = [MockTask() for _ in xrange(3)] + sequence = graph.sequence(*sequence_tasks) + parallel = graph.parallel(*parallel_tasks) + graph.add_dependency(sequence, parallel) + for parallel_task in parallel_tasks: + assert graph.has_dependency(sequence_tasks[0], parallel_task) is True + assert graph.has_dependency(sequence_tasks[1], parallel_task) is False + assert graph.has_dependency(sequence_tasks[2], parallel_task) is False + + def test_add_dependency_between_parallel_and_sequence(self, graph): + sequence_tasks = [MockTask() for _ in xrange(3)] + parallel_tasks = [MockTask() for _ in xrange(3)] + sequence = graph.sequence(*sequence_tasks) + parallel = graph.parallel(*parallel_tasks) + graph.add_dependency(parallel, sequence) + for parallel_task in parallel_tasks: + assert graph.has_dependency(parallel_task, sequence_tasks[0]) is False + assert graph.has_dependency(parallel_task, sequence_tasks[1]) is False + assert graph.has_dependency(parallel_task, sequence_tasks[2]) is True + + def test_add_dependency_dependency_parallel_with_some_existing_dependencies(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + parallel = graph.parallel(*parallel_tasks) + # adding a dependency on a specific task manually, + # before adding a dependency on the whole parallel + graph.add_dependency(task, parallel_tasks[1]) + graph.add_dependency(task, parallel) + assert graph.has_dependency(task, parallel_tasks[0]) is True + assert graph.has_dependency(task, parallel_tasks[1]) is True + assert graph.has_dependency(task, parallel_tasks[2]) is True + + def test_add_existing_dependency_between_sequence_and_parallel(self, graph): + sequence_tasks = [MockTask() for _ in xrange(3)] + parallel_tasks = [MockTask() for _ in xrange(3)] + sequence = graph.sequence(*sequence_tasks) + parallel = graph.parallel(*parallel_tasks) + add_result = graph.add_dependency(sequence, parallel) + assert add_result is True + # adding a dependency already in graph - should have no effect, and return False + add_result = graph.add_dependency(sequence, parallel) + assert add_result is False + for parallel_task in parallel_tasks: + assert graph.has_dependency(sequence_tasks[0], parallel_task) is True + assert graph.has_dependency(sequence_tasks[1], parallel_task) is False + assert graph.has_dependency(sequence_tasks[2], parallel_task) is False + + def test_has_dependency_dependent_sequence(self, graph): + task = MockTask() + sequence_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + sequence = graph.sequence(*sequence_tasks) + assert graph.has_dependency(sequence, task) is False + graph.add_dependency(sequence, task) + assert graph.has_dependency(sequence, task) is True + + def test_has_dependency_dependency_sequence(self, graph): + task = MockTask() + sequence_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + sequence = graph.sequence(*sequence_tasks) + assert graph.has_dependency(task, sequence) is False + graph.add_dependency(task, sequence) + assert graph.has_dependency(task, sequence) is True + + def test_has_dependency_dependent_parallel(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + parallel = graph.parallel(*parallel_tasks) + assert graph.has_dependency(parallel, task) is False + graph.add_dependency(parallel, task) + assert graph.has_dependency(parallel, task) is True + + def test_has_dependency_dependency_parallel(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + parallel = graph.parallel(*parallel_tasks) + assert graph.has_dependency(task, parallel) is False + graph.add_dependency(task, parallel) + assert graph.has_dependency(task, parallel) is True + + def test_has_dependency_between_sequence_and_parallel(self, graph): + sequence_tasks = [MockTask() for _ in xrange(3)] + parallel_tasks = [MockTask() for _ in xrange(3)] + sequence = graph.sequence(*sequence_tasks) + parallel = graph.parallel(*parallel_tasks) + assert graph.has_dependency(sequence, parallel) is False + graph.add_dependency(sequence, parallel) + assert graph.has_dependency(sequence, parallel) is True + + def test_has_dependency_between_parallel_and_sequence(self, graph): + sequence_tasks = [MockTask() for _ in xrange(3)] + parallel_tasks = [MockTask() for _ in xrange(3)] + sequence = graph.sequence(*sequence_tasks) + parallel = graph.parallel(*parallel_tasks) + assert graph.has_dependency(parallel, sequence) is False + graph.add_dependency(parallel, sequence) + assert graph.has_dependency(parallel, sequence) is True + + def test_has_dependency_dependency_parallel_with_some_existing_dependencies(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + parallel = graph.parallel(*parallel_tasks) + graph.add_dependency(task, parallel_tasks[1]) + # only a partial dependency exists - has_dependency is expected to return False + assert graph.has_dependency(task, parallel) is False + + def test_has_nonexistent_dependency_between_sequence_and_parallel(self, graph): + sequence_tasks = [MockTask() for _ in xrange(3)] + parallel_tasks = [MockTask() for _ in xrange(3)] + sequence = graph.sequence(*sequence_tasks) + parallel = graph.parallel(*parallel_tasks) + assert graph.has_dependency(sequence, parallel) is False + + def test_remove_dependency_dependent_sequence(self, graph): + task = MockTask() + sequence_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + sequence = graph.sequence(*sequence_tasks) + graph.add_dependency(sequence, task) + remove_result = graph.remove_dependency(sequence, task) + assert remove_result is True + assert graph.has_dependency(sequence_tasks[0], task) is False + assert graph.has_dependency(sequence_tasks[1], task) is False + assert graph.has_dependency(sequence_tasks[2], task) is False + + def test_remove_dependency_dependency_sequence(self, graph): + task = MockTask() + sequence_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + sequence = graph.sequence(*sequence_tasks) + graph.add_dependency(task, sequence) + remove_result = graph.remove_dependency(task, sequence) + assert remove_result is True + assert graph.has_dependency(task, sequence_tasks[0]) is False + assert graph.has_dependency(task, sequence_tasks[1]) is False + assert graph.has_dependency(task, sequence_tasks[2]) is False + + def test_remove_dependency_dependent_parallel(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + parallel = graph.parallel(*parallel_tasks) + graph.add_dependency(parallel, task) + remove_result = graph.remove_dependency(parallel, task) + assert remove_result is True + assert graph.has_dependency(parallel_tasks[0], task) is False + assert graph.has_dependency(parallel_tasks[1], task) is False + assert graph.has_dependency(parallel_tasks[2], task) is False + + def test_remove_dependency_dependency_parallel(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + parallel = graph.parallel(*parallel_tasks) + graph.add_dependency(task, parallel) + remove_result = graph.remove_dependency(task, parallel) + assert remove_result is True + assert graph.has_dependency(task, parallel_tasks[0]) is False + assert graph.has_dependency(task, parallel_tasks[1]) is False + assert graph.has_dependency(task, parallel_tasks[2]) is False + + def test_remove_dependency_between_sequence_and_parallel(self, graph): + sequence_tasks = [MockTask() for _ in xrange(3)] + parallel_tasks = [MockTask() for _ in xrange(3)] + sequence = graph.sequence(*sequence_tasks) + parallel = graph.parallel(*parallel_tasks) + graph.add_dependency(sequence, parallel) + remove_result = graph.remove_dependency(sequence, parallel) + assert remove_result is True + for parallel_task in parallel_tasks: + assert graph.has_dependency(sequence_tasks[0], parallel_task) is False + assert graph.has_dependency(sequence_tasks[1], parallel_task) is False + assert graph.has_dependency(sequence_tasks[2], parallel_task) is False + + def test_remove_dependency_between_parallel_and_sequence(self, graph): + sequence_tasks = [MockTask() for _ in xrange(3)] + parallel_tasks = [MockTask() for _ in xrange(3)] + sequence = graph.sequence(*sequence_tasks) + parallel = graph.parallel(*parallel_tasks) + graph.add_dependency(parallel, sequence) + remove_result = graph.remove_dependency(parallel, sequence) + assert remove_result is True + for parallel_task in parallel_tasks: + assert graph.has_dependency(parallel_task, sequence_tasks[0]) is False + assert graph.has_dependency(parallel_task, sequence_tasks[1]) is False + assert graph.has_dependency(parallel_task, sequence_tasks[2]) is False + + def test_remove_dependency_dependency_parallel_with_some_existing_dependencies(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_task(task) + parallel = graph.parallel(*parallel_tasks) + graph.add_dependency(task, parallel_tasks[1]) + remove_result = graph.remove_dependency(task, parallel) + # only a partial dependency exists - remove_dependency is expected to return False + assert remove_result is False + # no dependencies are expected to have changed + assert graph.has_dependency(task, parallel_tasks[0]) is False + assert graph.has_dependency(task, parallel_tasks[1]) is True + assert graph.has_dependency(task, parallel_tasks[2]) is False + + def test_remove_nonexistent_dependency_between_sequence_and_parallel(self, graph): + sequence_tasks = [MockTask() for _ in xrange(3)] + parallel_tasks = [MockTask() for _ in xrange(3)] + sequence = graph.sequence(*sequence_tasks) + parallel = graph.parallel(*parallel_tasks) + # removing a dependency not in graph - should have no effect, and return False + remove_result = graph.remove_dependency(sequence, parallel) + assert remove_result is False http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/core/__init__.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/__init__.py b/tests/workflows/core/__init__.py new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/core/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_executor.py b/tests/workflows/core/test_executor.py new file mode 100644 index 0000000..27cb2ad --- /dev/null +++ b/tests/workflows/core/test_executor.py @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import uuid + +import pytest +import retrying + +from aria import events +from aria.storage import models +from aria.workflows.core import executor + + +class TestExecutor(object): + + @pytest.mark.parametrize('pool_size,executor_cls', [ + (1, executor.ThreadExecutor), + (2, executor.ThreadExecutor), + (1, executor.MultiprocessExecutor), + (2, executor.MultiprocessExecutor), + (0, executor.CurrentThreadBlockingExecutor) + ]) + def test_execute(self, pool_size, executor_cls): + self.executor = executor_cls(pool_size) + expected_value = 'value' + successful_task = MockTask(mock_successful_task) + failing_task = MockTask(mock_failing_task) + task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value}) + + for task in [successful_task, failing_task, task_with_inputs]: + self.executor.execute(task) + + @retrying.retry(stop_max_delay=10000, wait_fixed=100) + def assertion(): + assert successful_task.states == ['start', 'success'] + assert failing_task.states == ['start', 'failure'] + assert task_with_inputs.states == ['start', 'failure'] + assert isinstance(failing_task.exception, TestException) + assert isinstance(task_with_inputs.exception, TestException) + assert task_with_inputs.exception.message == expected_value + assertion() + + def setup_method(self): + events.start_task_signal.connect(start_handler) + events.on_success_task_signal.connect(success_handler) + events.on_failure_task_signal.connect(failure_handler) + + def teardown_method(self): + events.start_task_signal.disconnect(start_handler) + events.on_success_task_signal.disconnect(success_handler) + events.on_failure_task_signal.disconnect(failure_handler) + if self.executor: + self.executor.close() + + +def mock_successful_task(): + pass + + +def mock_failing_task(): + raise TestException + + +def mock_task_with_input(input): + raise TestException(input) + + +class TestException(Exception): + pass + + +class MockContext(object): + + def __init__(self, operation_details, inputs): + self.operation_details = operation_details + self.inputs = inputs + self.operation = models.Operation(execution_id='') + + +class MockTask(object): + + def __init__(self, func, inputs=None): + self.states = [] + self.exception = None + self.id = str(uuid.uuid4()) + name = func.__name__ + operation = 'tests.workflows.test_executor.{name}'.format(name=name) + self.context = MockContext(operation_details={'operation': operation}, + inputs=inputs or {}) + self.logger = logging.getLogger() + self.name = name + + +def start_handler(task, *args, **kwargs): + task.states.append('start') + + +def success_handler(task, *args, **kwargs): + task.states.append('success') + + +def failure_handler(task, exception, *args, **kwargs): + task.states.append('failure') + task.exception = exception http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/core/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py b/tests/workflows/core/test_task_graph_into_exececution_graph.py new file mode 100644 index 0000000..deab4a3 --- /dev/null +++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from networkx import topological_sort, DiGraph + +from aria import contexts +from aria.workflows.api import task_graph +from aria.workflows.core import tasks, translation + + +@pytest.fixture(autouse=True) +def no_storage(monkeypatch): + monkeypatch.setattr(tasks.OperationTask, '_create_operation_in_storage', + value=lambda *args, **kwargs: None) + + +def test_task_graph_into_execution_graph(): + task_graph = task_graph.TaskGraph('test_task_graph') + simple_before_task = contexts.OperationContext('test_simple_before_task', {}, {}, None) + simple_after_task = contexts.OperationContext('test_simple_after_task', {}, {}, None) + + inner_task_graph = task_graph.TaskGraph('test_inner_task_graph') + inner_task = contexts.OperationContext('test_inner_task', {}, {}, None) + inner_task_graph.add_task(inner_task) + + task_graph.add_task(simple_before_task) + task_graph.add_task(simple_after_task) + task_graph.add_task(inner_task_graph) + task_graph.dependency(inner_task_graph, [simple_before_task]) + task_graph.dependency(simple_after_task, [inner_task_graph]) + + # Direct check + execution_graph = DiGraph() + translation.build_execution_graph(task_graph=task_graph, + workflow_context=None, + execution_graph=execution_graph) + execution_tasks = topological_sort(execution_graph) + + assert len(execution_tasks) == 7 + + expected_tasks_names = [ + '{0}-Start'.format(task_graph.id), + simple_before_task.id, + '{0}-Start'.format(inner_task_graph.id), + inner_task.id, + '{0}-End'.format(inner_task_graph.id), + simple_after_task.id, + '{0}-End'.format(task_graph.id) + ] + + assert expected_tasks_names == execution_tasks + + assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), + tasks.StartWorkflowTask) + assert simple_before_task == _get_task_by_name(execution_tasks[1], execution_graph).context + assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), + tasks.StartSubWorkflowTask) + assert inner_task == _get_task_by_name(execution_tasks[3], execution_graph).context + assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), tasks. + EndSubWorkflowTask) + assert simple_after_task == _get_task_by_name(execution_tasks[5], execution_graph).context + assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), tasks.EndWorkflowTask) + + +def _get_task_by_name(task_name, graph): + return graph.node[task_name]['task'] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_executor.py b/tests/workflows/test_executor.py deleted file mode 100644 index 27cb2ad..0000000 --- a/tests/workflows/test_executor.py +++ /dev/null @@ -1,117 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import uuid - -import pytest -import retrying - -from aria import events -from aria.storage import models -from aria.workflows.core import executor - - -class TestExecutor(object): - - @pytest.mark.parametrize('pool_size,executor_cls', [ - (1, executor.ThreadExecutor), - (2, executor.ThreadExecutor), - (1, executor.MultiprocessExecutor), - (2, executor.MultiprocessExecutor), - (0, executor.CurrentThreadBlockingExecutor) - ]) - def test_execute(self, pool_size, executor_cls): - self.executor = executor_cls(pool_size) - expected_value = 'value' - successful_task = MockTask(mock_successful_task) - failing_task = MockTask(mock_failing_task) - task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value}) - - for task in [successful_task, failing_task, task_with_inputs]: - self.executor.execute(task) - - @retrying.retry(stop_max_delay=10000, wait_fixed=100) - def assertion(): - assert successful_task.states == ['start', 'success'] - assert failing_task.states == ['start', 'failure'] - assert task_with_inputs.states == ['start', 'failure'] - assert isinstance(failing_task.exception, TestException) - assert isinstance(task_with_inputs.exception, TestException) - assert task_with_inputs.exception.message == expected_value - assertion() - - def setup_method(self): - events.start_task_signal.connect(start_handler) - events.on_success_task_signal.connect(success_handler) - events.on_failure_task_signal.connect(failure_handler) - - def teardown_method(self): - events.start_task_signal.disconnect(start_handler) - events.on_success_task_signal.disconnect(success_handler) - events.on_failure_task_signal.disconnect(failure_handler) - if self.executor: - self.executor.close() - - -def mock_successful_task(): - pass - - -def mock_failing_task(): - raise TestException - - -def mock_task_with_input(input): - raise TestException(input) - - -class TestException(Exception): - pass - - -class MockContext(object): - - def __init__(self, operation_details, inputs): - self.operation_details = operation_details - self.inputs = inputs - self.operation = models.Operation(execution_id='') - - -class MockTask(object): - - def __init__(self, func, inputs=None): - self.states = [] - self.exception = None - self.id = str(uuid.uuid4()) - name = func.__name__ - operation = 'tests.workflows.test_executor.{name}'.format(name=name) - self.context = MockContext(operation_details={'operation': operation}, - inputs=inputs or {}) - self.logger = logging.getLogger() - self.name = name - - -def start_handler(task, *args, **kwargs): - task.states.append('start') - - -def success_handler(task, *args, **kwargs): - task.states.append('success') - - -def failure_handler(task, exception, *args, **kwargs): - task.states.append('failure') - task.exception = exception http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/107c9729/tests/workflows/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_task_graph_into_exececution_graph.py b/tests/workflows/test_task_graph_into_exececution_graph.py deleted file mode 100644 index 1bae713..0000000 --- a/tests/workflows/test_task_graph_into_exececution_graph.py +++ /dev/null @@ -1,79 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest -from networkx import topological_sort, DiGraph - -from aria import contexts -from aria.workflows.api import tasks_graph -from aria.workflows.core import tasks, translation - - -@pytest.fixture(autouse=True) -def no_storage(monkeypatch): - monkeypatch.setattr(tasks.OperationTask, '_create_operation_in_storage', - value=lambda *args, **kwargs: None) - - -def test_task_graph_into_execution_graph(): - task_graph = tasks_graph.TaskGraph('test_task_graph') - simple_before_task = contexts.OperationContext('test_simple_before_task', {}, {}, None) - simple_after_task = contexts.OperationContext('test_simple_after_task', {}, {}, None) - - inner_task_graph = tasks_graph.TaskGraph('test_inner_task_graph') - inner_task = contexts.OperationContext('test_inner_task', {}, {}, None) - inner_task_graph.add_task(inner_task) - - task_graph.add_task(simple_before_task) - task_graph.add_task(simple_after_task) - task_graph.add_task(inner_task_graph) - task_graph.dependency(inner_task_graph, [simple_before_task]) - task_graph.dependency(simple_after_task, [inner_task_graph]) - - # Direct check - execution_graph = DiGraph() - translation.build_execution_graph(task_graph=task_graph, - workflow_context=None, - execution_graph=execution_graph) - execution_tasks = topological_sort(execution_graph) - - assert len(execution_tasks) == 7 - - expected_tasks_names = [ - '{0}-Start'.format(task_graph.id), - simple_before_task.id, - '{0}-Start'.format(inner_task_graph.id), - inner_task.id, - '{0}-End'.format(inner_task_graph.id), - simple_after_task.id, - '{0}-End'.format(task_graph.id) - ] - - assert expected_tasks_names == execution_tasks - - assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), - tasks.StartWorkflowTask) - assert simple_before_task == _get_task_by_name(execution_tasks[1], execution_graph).context - assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), - tasks.StartSubWorkflowTask) - assert inner_task == _get_task_by_name(execution_tasks[3], execution_graph).context - assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), tasks. - EndSubWorkflowTask) - assert simple_after_task == _get_task_by_name(execution_tasks[5], execution_graph).context - assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), tasks.EndWorkflowTask) - - -def _get_task_by_name(task_name, graph): - return graph.node[task_name]['task']