ARIA-4 modified graph group api
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/e721ba61 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/e721ba61 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/e721ba61 Branch: refs/heads/ARIA-3-api-for-creating-workflows Commit: e721ba61ba1ac13f55a622cb43b0dd9f86288f6e Parents: 6084453 Author: Ran Ziv <r...@gigaspaces.com> Authored: Sun Oct 30 09:05:23 2016 +0200 Committer: Ran Ziv <r...@gigaspaces.com> Committed: Sun Oct 30 09:05:23 2016 +0200 ---------------------------------------------------------------------- aria/workflows/api/task_graph.py | 113 +++++++--------------------- tests/workflows/api/test_task_graph.py | 28 +++---- 2 files changed, 43 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e721ba61/aria/workflows/api/task_graph.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/task_graph.py b/aria/workflows/api/task_graph.py index f645580..2d8ee54 100644 --- a/aria/workflows/api/task_graph.py +++ b/aria/workflows/api/task_graph.py @@ -215,64 +215,32 @@ class TaskGraph(object): self._graph.remove_edge(dependent_task.id, dependency_task.id) return True - def sequence(self, item, *items): - """ - A graph-building helper, useful for chaining items in a sequence - - it'll add a dependency for each item on the one that came before it - - an item may be either a task, a sequence or a parallel - this method will also add to the graph each of the items not already present in it - - :param BaseTask|_TasksArrangement item: The first item (task, sequence or parallel) - :param BaseTask|_TasksArrangement *items: The rest of the items (task, sequence or parallel) - :return: The sequence object - :rtype: _TasksSequence - """ - all_items = [item] + [i for i in items if i] - prev_item = None - - for an_item in all_items: - if not isinstance(an_item, TaskGraph._TasksArrangement) and not self.has_task(an_item): - self.add_task(an_item) - if prev_item is not None: - self.add_dependency(an_item, prev_item) - prev_item = an_item - - return self._TasksSequence(all_items) - - def parallel(self, item, *items): - """ - A graph-building helper, useful for allowing for several items to execute in parallel - this method won't actually add any dependency between the items passed to it - - but the returned object may be used to add dependencies for and on this group of items + def sequence(self, *items): + group = self.parallel(*items) - an item may be either a task, a sequence or a parallel - this method will also add to the graph each of the items not already present in it + for i in xrange(1, len(items)): + self.add_dependency(items[i], items[i-1]) - :param BaseTask|_TasksArrangement item: One item (task, sequence or parallel) - :param BaseTask|_TasksArrangement *items: The rest of the items (task, sequence or parallel) - :return: The parallel object - :rtype: _TasksParallel - """ - all_items = [item] + [i for i in items if i] + return group - for an_item in all_items: - if not isinstance(an_item, TaskGraph._TasksArrangement) and not self.has_task(an_item): - self.add_task(an_item) + def parallel(self, *items): + for item in items: + if not isinstance(item, _TasksGroup): + self.add_task(item) - return self._TasksParallel(all_items) + return _TasksGroup(items) def _extract_dependent_and_dependency_tasks(self, dependent, dependency): - if isinstance(dependent, TaskGraph._TasksArrangement): - dependent_tasks = dependent.get_first_tasks() + if isinstance(dependent, _TasksGroup): + dependent_tasks = dependent.tasks else: if not self._graph.has_node(dependent.id): raise TaskNotInGraphError( 'dependent task {0!r} is not in graph (task id: {0.id})'.format(dependent)) dependent_tasks = [dependent] - if isinstance(dependency, TaskGraph._TasksArrangement): - dependency_tasks = dependency.get_last_tasks() + if isinstance(dependency, _TasksGroup): + dependency_tasks = dependency.tasks else: if not self._graph.has_node(dependency.id): raise TaskNotInGraphError( @@ -281,42 +249,17 @@ class TaskGraph(object): return dependent_tasks, dependency_tasks - class _TasksArrangement(object): - def __init__(self, tasks): - self._tasks = tasks - - def get_first_tasks(self): - """ - The first tasks to be executed in the arrangement - :rtype list - :return: list of the first tasks to be executed in the arrangement - """ - raise NotImplementedError() - - def get_last_tasks(self): - """ - The last tasks to be executed in the arrangement - :rtype list - :return: list of the last tasks to be executed in the arrangement - """ - raise NotImplementedError() - - class _TasksSequence(_TasksArrangement): - def __init__(self, tasks): - super(TaskGraph._TasksSequence, self).__init__(tasks) - - def get_first_tasks(self): - return [self._tasks[0]] - - def get_last_tasks(self): - return [self._tasks[-1]] - - class _TasksParallel(_TasksArrangement): - def __init__(self, tasks): - super(TaskGraph._TasksParallel, self).__init__(tasks) - - def get_first_tasks(self): - return self._tasks - - def get_last_tasks(self): - return self._tasks + +class _TasksGroup(object): + + def __init__(self, tasks): + self._tasks = map(lambda t: t.tasks if isinstance(t, _TasksGroup) else t, tasks) + + @property + def tasks(self): + """ + The tasks in this group + :rtype list + :return: tasks list + """ + return self._tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e721ba61/tests/workflows/api/test_task_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/test_task_graph.py b/tests/workflows/api/test_task_graph.py index ba8862c..63d1c7c 100644 --- a/tests/workflows/api/test_task_graph.py +++ b/tests/workflows/api/test_task_graph.py @@ -333,9 +333,11 @@ class TestTaskGraphArrangements(object): assert set(graph_tasks) == set(tasks) # first task should have no dependencies assert len(list(graph.get_task_dependencies(tasks[0]))) == 0 - # rest of the tasks should have a single dependency - each depends on the one before it - for i in xrange(1, len(tasks)): - assert set(graph.get_task_dependencies(tasks[i])) == {tasks[i-1]} + assert len(list(graph.get_task_dependencies(tasks[1]))) == 1 + assert len(list(graph.get_task_dependencies(tasks[2]))) == 2 + assert len(list(graph.get_task_dependencies(tasks[3]))) == 2 + assert len(list(graph.get_task_dependencies(tasks[4]))) == 3 + def test_sequence_with_nested_parallel(self, graph): tasks = [MockTask() for _ in xrange(5)] @@ -407,8 +409,8 @@ class TestTaskGraphDependenciesWithArrangements(object): sequence = graph.sequence(*sequence_tasks) graph.add_dependency(sequence, task) assert graph.has_dependency(sequence_tasks[0], task) is True - assert graph.has_dependency(sequence_tasks[1], task) is False - assert graph.has_dependency(sequence_tasks[2], task) is False + assert graph.has_dependency(sequence_tasks[1], task) is True + assert graph.has_dependency(sequence_tasks[2], task) is True def test_add_dependency_dependency_sequence(self, graph): task = MockTask() @@ -416,8 +418,8 @@ class TestTaskGraphDependenciesWithArrangements(object): graph.add_task(task) sequence = graph.sequence(*sequence_tasks) graph.add_dependency(task, sequence) - assert graph.has_dependency(task, sequence_tasks[0]) is False - assert graph.has_dependency(task, sequence_tasks[1]) is False + assert graph.has_dependency(task, sequence_tasks[0]) is True + assert graph.has_dependency(task, sequence_tasks[1]) is True assert graph.has_dependency(task, sequence_tasks[2]) is True def test_add_dependency_dependent_parallel(self, graph): @@ -448,8 +450,8 @@ class TestTaskGraphDependenciesWithArrangements(object): graph.add_dependency(sequence, parallel) for parallel_task in parallel_tasks: assert graph.has_dependency(sequence_tasks[0], parallel_task) is True - assert graph.has_dependency(sequence_tasks[1], parallel_task) is False - assert graph.has_dependency(sequence_tasks[2], parallel_task) is False + assert graph.has_dependency(sequence_tasks[1], parallel_task) is True + assert graph.has_dependency(sequence_tasks[2], parallel_task) is True def test_add_dependency_between_parallel_and_sequence(self, graph): sequence_tasks = [MockTask() for _ in xrange(3)] @@ -458,8 +460,8 @@ class TestTaskGraphDependenciesWithArrangements(object): parallel = graph.parallel(*parallel_tasks) graph.add_dependency(parallel, sequence) for parallel_task in parallel_tasks: - assert graph.has_dependency(parallel_task, sequence_tasks[0]) is False - assert graph.has_dependency(parallel_task, sequence_tasks[1]) is False + assert graph.has_dependency(parallel_task, sequence_tasks[0]) is True + assert graph.has_dependency(parallel_task, sequence_tasks[1]) is True assert graph.has_dependency(parallel_task, sequence_tasks[2]) is True def test_add_dependency_dependency_parallel_with_some_existing_dependencies(self, graph): @@ -487,8 +489,8 @@ class TestTaskGraphDependenciesWithArrangements(object): assert add_result is False for parallel_task in parallel_tasks: assert graph.has_dependency(sequence_tasks[0], parallel_task) is True - assert graph.has_dependency(sequence_tasks[1], parallel_task) is False - assert graph.has_dependency(sequence_tasks[2], parallel_task) is False + assert graph.has_dependency(sequence_tasks[1], parallel_task) is True + assert graph.has_dependency(sequence_tasks[2], parallel_task) is True def test_has_dependency_dependent_sequence(self, graph): task = MockTask()