http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/builtin/workflows.py
----------------------------------------------------------------------
diff --git a/aria/workflows/builtin/workflows.py 
b/aria/workflows/builtin/workflows.py
index b6fbb94..fc54f75 100644
--- a/aria/workflows/builtin/workflows.py
+++ b/aria/workflows/builtin/workflows.py
@@ -21,6 +21,8 @@ from itertools import groupby
 
 from aria import workflow
 
+from ..api import task
+
 
 __all__ = (
     'install_node_instance',
@@ -32,43 +34,43 @@ __all__ = (
 # Install node instance workflow and sub workflows
 
 @workflow(suffix_template='{node_instance.id}')
-def install_node_instance(context, graph, node_instance):
+def install_node_instance(ctx, graph, node_instance):
     """
     A workflow which installs a node instance.
-    :param WorkflowContext context: the workflow context
+    :param WorkflowContext ctx: the workflow context
     :param TaskGraph graph: the tasks graph of which to edit
     :param node_instance: the node instance to install
     :return:
     """
-    create_node_instance = context.operation(
+    create_node_instance = task.OperationTask(
         name='aria.interfaces.lifecycle.create.{0}'.format(node_instance.id),
-        operation_details=node_instance.node.operations[
-            'aria.interfaces.lifecycle.create'],
+        
operation_details=node_instance.node.operations['aria.interfaces.lifecycle.create'],
         node_instance=node_instance
     )
-    configure_node_instance = context.operation(
+    configure_node_instance = task.OperationTask(
         
name='aria.interfaces.lifecycle.configure.{0}'.format(node_instance.id),
         
operation_details=node_instance.node.operations['aria.interfaces.lifecycle.configure'],
         node_instance=node_instance
         )
-    start_node_instance = context.operation(
+    start_node_instance = task.OperationTask(
         name='aria.interfaces.lifecycle.start.{0}'.format(node_instance.id),
-        operation_details=node_instance.node.operations[
-            'aria.interfaces.lifecycle.start'],
+        
operation_details=node_instance.node.operations['aria.interfaces.lifecycle.start'],
         node_instance=node_instance
     )
-    graph.chain(tasks=[
+
+    graph.sequence(
         create_node_instance,
-        preconfigure_relationship(context=context, 
node_instance=node_instance),
+        preconfigure_relationship(graph, ctx, node_instance),
         configure_node_instance,
-        postconfigure_relationship(context=context, 
node_instance=node_instance),
+        postconfigure_relationship(graph, ctx, node_instance),
         start_node_instance,
-        establish_relationship(context=context, node_instance=node_instance),
-    ])
+        establish_relationship(graph, ctx, node_instance)
+    )
 
+    return graph
 
-@workflow(suffix_template='{node_instance.id}')
-def preconfigure_relationship(context, graph, node_instance):
+
+def preconfigure_relationship(graph, ctx, node_instance):
     """
 
     :param context:
@@ -76,14 +78,14 @@ def preconfigure_relationship(context, graph, 
node_instance):
     :param node_instance:
     :return:
     """
-    graph.chain(tasks=relationships_tasks(
+    return relationships_tasks(
+        graph=graph,
         operation_name='aria.interfaces.relationship_lifecycle.preconfigure',
-        context=context,
-        node_instance=node_instance))
+        context=ctx,
+        node_instance=node_instance)
 
 
-@workflow(suffix_template='{node_instance.id}')
-def postconfigure_relationship(context, graph, node_instance):
+def postconfigure_relationship(graph, ctx, node_instance):
     """
 
     :param context:
@@ -91,14 +93,14 @@ def postconfigure_relationship(context, graph, 
node_instance):
     :param node_instance:
     :return:
     """
-    graph.chain(tasks=relationships_tasks(
+    return relationships_tasks(
+        graph=graph,
         operation_name='aria.interfaces.relationship_lifecycle.postconfigure',
-        context=context,
-        node_instance=node_instance))
+        context=ctx,
+        node_instance=node_instance)
 
 
-@workflow(suffix_template='{node_instance.id}')
-def establish_relationship(context, graph, node_instance):
+def establish_relationship(graph, ctx, node_instance):
     """
 
     :param context:
@@ -106,16 +108,17 @@ def establish_relationship(context, graph, node_instance):
     :param node_instance:
     :return:
     """
-    graph.chain(tasks=relationships_tasks(
+    return relationships_tasks(
+        graph=graph,
         operation_name='aria.interfaces.relationship_lifecycle.establish',
-        context=context,
-        node_instance=node_instance))
+        context=ctx,
+        node_instance=node_instance)
 
 
 # Uninstall node instance workflow and subworkflows
 
 @workflow(suffix_template='{node_instance.id}')
-def uninstall_node_instance(graph, context, node_instance):
+def uninstall_node_instance(ctx, graph, node_instance):
     """
         A workflow which uninstalls a node instance.
         :param WorkflowContext context: the workflow context
@@ -123,28 +126,25 @@ def uninstall_node_instance(graph, context, 
node_instance):
         :param node_instance: the node instance to uninstall
         :return:
         """
-    stop_node_instance = context.operation(
+    stop_node_instance = task.OperationTask(
         name='aria.interfaces.lifecycle.stop.{0}'.format(node_instance.id),
-        operation_details=node_instance.node.operations[
-            'aria.interfaces.lifecycle.stop'],
+        
operation_details=node_instance.node.operations['aria.interfaces.lifecycle.stop'],
         node_instance=node_instance
     )
-    delete_node_instance = context.operation(
+    delete_node_instance = task.OperationTask(
         name='aria.interfaces.lifecycle.delete.{0}'.format(node_instance.id),
-        operation_details=node_instance.node.operations[
-            'aria.interfaces.lifecycle.delete'],
+        
operation_details=node_instance.node.operations['aria.interfaces.lifecycle.delete'],
         node_instance=node_instance
     )
 
-    graph.chain(tasks=[
+    graph.sequence(
         stop_node_instance,
-        unlink_relationship(context=context, node_instance=node_instance),
-        delete_node_instance,
-    ])
+        unlink_relationship(graph, ctx, node_instance),
+        delete_node_instance
+    )
 
 
-@workflow(suffix_template='{node_instance.id}')
-def unlink_relationship(context, graph, node_instance):
+def unlink_relationship(graph, ctx, node_instance):
     """
 
     :param context:
@@ -152,19 +152,15 @@ def unlink_relationship(context, graph, node_instance):
     :param node_instance:
     :return:
     """
-    tasks = relationships_tasks(
+    return relationships_tasks(
+        graph=graph,
         operation_name='aria.interfaces.relationship_lifecycle.unlink',
-        context=context,
+        context=ctx,
         node_instance=node_instance
     )
-    graph.chain(tasks=tasks)
-    return tasks
 
 
-@workflow(suffix_template='{node_instance.id}.{operation}')
 def execute_operation_on_instance(
-        context,
-        graph,
         node_instance,
         operation,
         operation_kwargs,
@@ -187,16 +183,18 @@ def execute_operation_on_instance(
         node_instance=node_instance,
         operation_name=operation)
 
-    graph.add_task(
-        context.operation(
-            name=task_name,
-            operation_details=node_instance.node.operations[operation],
-            node_instance=node_instance,
-            parameters=operation_kwargs)
-    )
+    return task.OperationTask(
+        name=task_name,
+        operation_details=node_instance.node.operations[operation],
+        node_instance=node_instance,
+        inputs=operation_kwargs)
 
 
-def relationships_tasks(operation_name, context, node_instance):
+
+def relationships_tasks(graph,
+                        operation_name,
+                        context,
+                        node_instance):
     """
     Creates a relationship task (source and target) for all of a node_instance 
relationships.
     :param basestring operation_name: the relationship operation name.
@@ -211,17 +209,24 @@ def relationships_tasks(operation_name, context, 
node_instance):
     sub_tasks = []
     for index, (_, relationship_group) in enumerate(relationships_groups):
         for relationship_instance in relationship_group:
-            relationship_subgraph = relationship_tasks(
+            relationship_operations = relationship_tasks(
+                graph=graph,
                 node_instance=node_instance,
                 relationship_instance=relationship_instance,
                 context=context,
                 operation_name=operation_name,
                 index=index)
-            sub_tasks.append(relationship_subgraph)
-    return sub_tasks
+            sub_tasks.append(relationship_operations)
+
+    return graph.sequence(*sub_tasks)
 
 
-def relationship_tasks(node_instance, relationship_instance, context, 
operation_name, index=None):
+def relationship_tasks(graph,
+                       node_instance,
+                       relationship_instance,
+                       context,
+                       operation_name,
+                       index=None):
     """
     Creates a relationship task source and target.
     :param NodeInstance node_instance: the node instance of the relationship
@@ -232,29 +237,20 @@ def relationship_tasks(node_instance, 
relationship_instance, context, operation_
     :return:
     """
     index = index or 
node_instance.relationship_instances.index(relationship_instance)
-    sub_workflow_name = '{name}.{index}.{node_instance.id}'.format(
-        name=operation_name,
-        index=index,
-        node_instance=node_instance,
-    )
     operation_name_template = '{name}.{index}.{{0}}.<{source_id}, 
{target_id}>'.format(
         name=operation_name,
         index=index,
         source_id=node_instance.id,
         target_id=relationship_instance.target_id,
     )
-    source_operation = context.operation(
+    source_operation = task.OperationTask(
         name=operation_name_template.format('source'),
         node_instance=node_instance,
         operation_details=relationship_instance.relationship.source_operations[
             operation_name])
-    target_operation = context.operation(
+    target_operation = task.OperationTask(
         name=operation_name_template.format('target'),
-        node_instance=context.storage.node_instance.get(
-            relationship_instance.target_id),
+        
node_instance=context.model.node_instance.get(relationship_instance.target_id),
         operation_details=relationship_instance.relationship.target_operations[
             operation_name])
-    sub_graph = context.task_graph(name=sub_workflow_name)
-    sub_graph.add_task(source_operation)
-    sub_graph.add_task(target_operation)
-    return sub_graph
+    return graph.add_tasks(source_operation, target_operation)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/core/__init__.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/__init__.py b/aria/workflows/core/__init__.py
index ae1e83e..646f44a 100644
--- a/aria/workflows/core/__init__.py
+++ b/aria/workflows/core/__init__.py
@@ -12,3 +12,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+"""
+Core for the workflow execution mechanism
+"""
+
+from . import task

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py
index a00bc84..a288757 100644
--- a/aria/workflows/core/engine.py
+++ b/aria/workflows/core/engine.py
@@ -22,12 +22,11 @@ from datetime import datetime
 
 import networkx
 
-from aria import events, logger
-from aria.storage import models
-
+from ... import events, logger
+from ...storage import models
 from .. import exceptions
 from . import translation
-from . import tasks
+from . import task as engine_task
 
 
 class Engine(logger.LoggerMixin):
@@ -41,7 +40,6 @@ class Engine(logger.LoggerMixin):
         self._execution_graph = networkx.DiGraph()
         self._executor = executor
         translation.build_execution_graph(task_graph=tasks_graph,
-                                          workflow_context=workflow_context,
                                           
execution_graph=self._execution_graph)
 
     def execute(self):
@@ -67,13 +65,12 @@ class Engine(logger.LoggerMixin):
     def _executable_tasks(self):
         now = datetime.now()
         return (task for task in self._tasks_iter()
-                if task.status == models.Operation.PENDING and
+                if task.status == models.Task.PENDING and
                 task.eta <= now and
                 not self._task_has_dependencies(task))
 
     def _ended_tasks(self):
-        return (task for task in self._tasks_iter()
-                if task.status in models.Operation.END_STATES)
+        return (task for task in self._tasks_iter() if task.status in 
models.Task.END_STATES)
 
     def _task_has_dependencies(self, task):
         return len(self._execution_graph.pred.get(task.id, {})) > 0
@@ -85,14 +82,14 @@ class Engine(logger.LoggerMixin):
         return (data['task'] for _, data in 
self._execution_graph.nodes_iter(data=True))
 
     def _handle_executable_task(self, task):
-        if isinstance(task, tasks.BaseWorkflowTask):
-            task.status = models.Operation.SUCCESS
+        if isinstance(task, engine_task.BaseWorkflowTask):
+            task.status = models.Task.SUCCESS
         else:
             events.sent_task_signal.send(task)
             self._executor.execute(task)
 
     def _handle_ended_tasks(self, task):
-        if task.status == models.Operation.FAILED:
+        if task.status == models.Task.FAILED:
             raise exceptions.ExecutorException('Workflow failed')
         else:
             self._execution_graph.remove_node(task.id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py
new file mode 100644
index 0000000..fc72b59
--- /dev/null
+++ b/aria/workflows/core/task.py
@@ -0,0 +1,200 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow tasks
+"""
+from contextlib import contextmanager
+from datetime import datetime
+
+from ... import logger
+from ...storage import models
+from .. import exceptions
+
+
+class BaseTask(logger.LoggerMixin):
+    """
+    Base class for Task objects
+    """
+
+    def __init__(self, id, *args, **kwargs):
+        super(BaseTask, self).__init__(*args, **kwargs)
+        self._id = id
+
+    @property
+    def id(self):
+        """
+        :return: the task's id
+        """
+        return self._id
+
+
+class BaseWorkflowTask(BaseTask):
+    """
+    Base class for all workflow wrapping tasks
+    """
+
+    def __init__(self, *args, **kwargs):
+        super(BaseWorkflowTask, self).__init__(*args, **kwargs)
+        self.status = models.Task.PENDING
+        self.eta = datetime.now()
+
+
+class StartWorkflowTask(BaseWorkflowTask):
+    """
+    Tasks marking a workflow start
+    """
+    pass
+
+
+class EndWorkflowTask(BaseWorkflowTask):
+    """
+    Tasks marking a workflow end
+    """
+    pass
+
+
+class StartSubWorkflowTask(BaseWorkflowTask):
+    """
+    Tasks marking a subworkflow start
+    """
+    pass
+
+
+class EndSubWorkflowTask(BaseWorkflowTask):
+    """
+    Tasks marking a subworkflow end
+    """
+    pass
+
+
+class OperationTask(BaseTask, logger.LoggerMixin):
+    """
+    Operation tasks
+    """
+
+    def __init__(self, api_task, *args, **kwargs):
+        super(OperationTask, self).__init__(id=api_task.id, **kwargs)
+        self._workflow_ctx = api_task.workflow_context
+        task_model = api_task.workflow_context.model.task.model_cls
+        task = task_model(
+            name=api_task.name,
+            operation_details=api_task.operation_details,
+            node_instance=api_task.node_instance,
+            inputs=api_task.inputs,
+            status=task_model.PENDING,
+            execution_id=self.workflow_context.execution_id,
+            max_retries=self.workflow_context.parameters.get('max_retries', 1),
+        )
+        self.workflow_context.model.task.store(task)
+        self._task_id = task.id
+        self._update_fields = None
+
+    @contextmanager
+    def update(self):
+        """
+        A context manager which puts the task into update mode, enabling 
fields update.
+        :yields: None
+        """
+        self._update_fields = {}
+        try:
+            yield
+            task = self.context
+            for key, value in self._update_fields.items():
+                setattr(task, key, value)
+            self.context = task
+        finally:
+            self._update_fields = None
+
+    @property
+    def workflow_context(self):
+        """
+        :return: the task's name
+        """
+        return self._workflow_ctx
+
+    @property
+    def context(self):
+        """
+        Returns the task model in storage
+        :return: task in storage
+        """
+        return self.workflow_context.model.task.get(self._task_id)
+
+    @context.setter
+    def context(self, value):
+        self.workflow_context.model.task.store(value)
+
+    @property
+    def status(self):
+        """
+        Returns the task status
+        :return: task status
+        """
+        return self.context.status
+
+    @status.setter
+    def status(self, value):
+        if self._update_fields is None:
+            raise exceptions.TaskException("Task is not in update mode")
+        self._update_fields['status'] = value
+
+    @property
+    def started_at(self):
+        """
+        Returns when the task started
+        :return: when task started
+        """
+        return self.context.started_at
+
+    @started_at.setter
+    def started_at(self, value):
+        if self._update_fields is None:
+            raise exceptions.TaskException("Task is not in update mode")
+        self._update_fields['started_at'] = value
+
+    @property
+    def ended_at(self):
+        """
+        Returns when the task ended
+        :return: when task ended
+        """
+        return self.context.ended_at
+
+    @ended_at.setter
+    def ended_at(self, value):
+        if self._update_fields is None:
+            raise exceptions.TaskException("Task is not in update mode")
+        self._update_fields['ended_at'] = value
+
+    @property
+    def retry_count(self):
+        """
+        Returns the retry count for the task
+        :return: retry count
+        """
+        return self.context.retry_count
+
+    @retry_count.setter
+    def retry_count(self, value):
+        if self._update_fields is None:
+            raise exceptions.TaskException("Task is not in update mode")
+        self._update_fields['retry_count'] = value
+
+    def __getattr__(self, attr):
+        try:
+            return getattr(self.context, attr)
+        except AttributeError:
+            return super(OperationTask, self).__getattribute__(attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/core/tasks.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/tasks.py b/aria/workflows/core/tasks.py
deleted file mode 100644
index 98d7c13..0000000
--- a/aria/workflows/core/tasks.py
+++ /dev/null
@@ -1,121 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow tasks
-"""
-
-from datetime import datetime
-
-from aria import logger
-from aria.storage import models
-
-
-class BaseTask(logger.LoggerMixin):
-    """
-    Base class for Task objects
-    """
-
-    def __init__(self, id, name, context, *args, **kwargs):
-        super(BaseTask, self).__init__(*args, **kwargs)
-        self._id = id
-        self._name = name
-        self._context = context
-
-    @property
-    def id(self):
-        """
-        :return: the task's id
-        """
-        return self._id
-
-    @property
-    def name(self):
-        """
-        :return: the task's name
-        """
-        return self._name
-
-    @property
-    def context(self):
-        """
-        :return: the task's context
-        """
-        return self._context
-
-
-class BaseWorkflowTask(BaseTask):
-    """
-    Base class for all workflow wrapping tasks
-    """
-
-    def __init__(self, *args, **kwargs):
-        super(BaseWorkflowTask, self).__init__(*args, **kwargs)
-        self.status = models.Operation.PENDING
-        self.eta = datetime.now()
-
-
-class StartWorkflowTask(BaseWorkflowTask):
-    """
-    Tasks marking a workflow start
-    """
-    pass
-
-
-class EndWorkflowTask(BaseWorkflowTask):
-    """
-    Tasks marking a workflow end
-    """
-    pass
-
-
-class StartSubWorkflowTask(BaseWorkflowTask):
-    """
-    Tasks marking a subworkflow start
-    """
-    pass
-
-
-class EndSubWorkflowTask(BaseWorkflowTask):
-    """
-    Tasks marking a subworkflow end
-    """
-    pass
-
-
-class OperationTask(BaseTask):
-    """
-    Operation tasks
-    """
-
-    def __init__(self, *args, **kwargs):
-        super(OperationTask, self).__init__(*args, **kwargs)
-        self._create_operation_in_storage()
-
-    def _create_operation_in_storage(self):
-        operation_cls = self.context.model.operation.model_cls
-        operation = operation_cls(
-            id=self.context.id,
-            execution_id=self.context.execution_id,
-            max_retries=self.context.parameters.get('max_retries', 1),
-            status=operation_cls.PENDING,
-        )
-        self.context.operation = operation
-
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.context.operation, attr)
-        except AttributeError:
-            return super(OperationTask, self).__getattribute__(attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/translation.py 
b/aria/workflows/core/translation.py
index dc483c6..cd9a0e6 100644
--- a/aria/workflows/core/translation.py
+++ b/aria/workflows/core/translation.py
@@ -17,17 +17,15 @@
 Translation of user graph's API to the execution graph
 """
 
-from aria import contexts
-
-from . import tasks
+from . import task as core_task
+from .. import api
 
 
 def build_execution_graph(
         task_graph,
-        workflow_context,
         execution_graph,
-        start_cls=tasks.StartWorkflowTask,
-        end_cls=tasks.EndWorkflowTask,
+        start_cls=core_task.StartWorkflowTask,
+        end_cls=core_task.EndWorkflowTask,
         depends_on=()):
     """
     Translates the user graph to the execution graph
@@ -39,43 +37,36 @@ def build_execution_graph(
     :param depends_on: internal use
     """
     # Insert start marker
-    start_task = start_cls(id=_start_graph_suffix(task_graph.id),
-                           name=_start_graph_suffix(task_graph.name),
-                           context=workflow_context)
+    start_task = start_cls(id=_start_graph_suffix(task_graph.id))
     _add_task_and_dependencies(execution_graph, start_task, depends_on)
 
-    for operation_or_workflow, dependencies in 
task_graph.task_tree(reverse=True):
+    for api_task in task_graph.topological_order(reverse=True):
+        dependencies = task_graph.get_dependencies(api_task)
         operation_dependencies = _get_tasks_from_dependencies(
             execution_graph,
             dependencies,
             default=[start_task])
 
-        if _is_operation(operation_or_workflow):
+        if _is_operation(api_task):
             # Add the task an the dependencies
-            operation_task = tasks.OperationTask(id=operation_or_workflow.id,
-                                                 
name=operation_or_workflow.name,
-                                                 context=operation_or_workflow)
+            operation_task = core_task.OperationTask(api_task)
             _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies)
         else:
             # Built the graph recursively while adding start and end markers
             build_execution_graph(
-                task_graph=operation_or_workflow,
-                workflow_context=workflow_context,
+                task_graph=api_task,
                 execution_graph=execution_graph,
-                start_cls=tasks.StartSubWorkflowTask,
-                end_cls=tasks.EndSubWorkflowTask,
+                start_cls=core_task.StartSubWorkflowTask,
+                end_cls=core_task.EndSubWorkflowTask,
                 depends_on=operation_dependencies
             )
 
     # Insert end marker
     workflow_dependencies = _get_tasks_from_dependencies(
         execution_graph,
-        task_graph.leaf_tasks,
+        _get_non_dependency_tasks(task_graph),
         default=[start_task])
-    end_task = end_cls(
-        id=_end_graph_suffix(task_graph.id),
-        name=_end_graph_suffix(task_graph.name),
-        context=workflow_context)
+    end_task = end_cls(id=_end_graph_suffix(task_graph.id))
     _add_task_and_dependencies(execution_graph, end_task, 
workflow_dependencies)
 
 
@@ -95,7 +86,7 @@ def _get_tasks_from_dependencies(execution_graph, 
dependencies, default=()):
 
 
 def _is_operation(task):
-    return isinstance(task, contexts.OperationContext)
+    return isinstance(task, api.task.OperationTask)
 
 
 def _start_graph_suffix(id):
@@ -104,3 +95,9 @@ def _start_graph_suffix(id):
 
 def _end_graph_suffix(id):
     return '{0}-End'.format(id)
+
+
+def _get_non_dependency_tasks(graph):
+    for task in graph.tasks:
+        if len(list(graph.get_dependents(task))) == 0:
+            yield task

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/workflows/exceptions.py b/aria/workflows/exceptions.py
index b8ebc14..d7b189d 100644
--- a/aria/workflows/exceptions.py
+++ b/aria/workflows/exceptions.py
@@ -61,3 +61,10 @@ class AriaEngineError(Exception):
     """
     Raised by the workflow engine
     """
+
+
+class TaskException(Exception):
+    """
+    Raised by the task
+    """
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/executor/blocking.py
----------------------------------------------------------------------
diff --git a/aria/workflows/executor/blocking.py 
b/aria/workflows/executor/blocking.py
index 86171ba..f072d8a 100644
--- a/aria/workflows/executor/blocking.py
+++ b/aria/workflows/executor/blocking.py
@@ -29,9 +29,8 @@ class CurrentThreadBlockingExecutor(BaseExecutor):
     def execute(self, task):
         self._task_started(task)
         try:
-            operation_context = task.context
-            task_func = 
module.load_attribute(operation_context.operation_details['operation'])
-            task_func(**operation_context.inputs)
+            task_func = 
module.load_attribute(task.operation_details['operation'])
+            task_func(**task.inputs)
             self._task_succeeded(task)
         except BaseException as e:
             self._task_failed(task, exception=e)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/executor/celery.py
----------------------------------------------------------------------
diff --git a/aria/workflows/executor/celery.py 
b/aria/workflows/executor/celery.py
index 2d486f2..a82a6b7 100644
--- a/aria/workflows/executor/celery.py
+++ b/aria/workflows/executor/celery.py
@@ -43,11 +43,10 @@ class CeleryExecutor(BaseExecutor):
         self._started_queue.get(timeout=30)
 
     def execute(self, task):
-        operation_context = task.context
         self._tasks[task.id] = task
         self._results[task.id] = self._app.send_task(
-            operation_context.operation_details['operation'],
-            kwargs=operation_context.inputs,
+            task.operation_details['operation'],
+            kwargs=task.inputs,
             task_id=task.id,
             queue=self._get_queue(task))
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/executor/multiprocess.py
----------------------------------------------------------------------
diff --git a/aria/workflows/executor/multiprocess.py 
b/aria/workflows/executor/multiprocess.py
index e6faf5f..4af08c0 100644
--- a/aria/workflows/executor/multiprocess.py
+++ b/aria/workflows/executor/multiprocess.py
@@ -40,16 +40,15 @@ class MultiprocessExecutor(BaseExecutor):
         self._listener_thread = threading.Thread(target=self._listener)
         self._listener_thread.daemon = True
         self._listener_thread.start()
-        self._pool = multiprocessing.Pool(processes=pool_size,
-                                          maxtasksperchild=1)
+        self._pool = multiprocessing.Pool(processes=pool_size)
 
     def execute(self, task):
         self._tasks[task.id] = task
         self._pool.apply_async(_multiprocess_handler, args=(
             self._queue,
             task.id,
-            task.context.operation_details,
-            task.context.inputs))
+            task.operation_details,
+            task.inputs))
 
     def close(self):
         self._pool.close()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/workflows/executor/thread.py 
b/aria/workflows/executor/thread.py
index dfc0f18..180c482 100644
--- a/aria/workflows/executor/thread.py
+++ b/aria/workflows/executor/thread.py
@@ -55,10 +55,8 @@ class ThreadExecutor(BaseExecutor):
                 task = self._queue.get(timeout=1)
                 self._task_started(task)
                 try:
-                    operation_context = task.context
-                    task_func = module.load_attribute(
-                        operation_context.operation_details['operation'])
-                    task_func(**operation_context.inputs)
+                    task_func = 
module.load_attribute(task.operation_details['operation'])
+                    task_func(**task.inputs)
                     self._task_succeeded(task)
                 except BaseException as e:
                     self._task_failed(task, exception=e)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 1240b72..b550a58 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -15,4 +15,6 @@ networkx==1.9
 requests==2.7.0
 retrying==1.3.3
 blinker==1.4
+importlib==1.0.4 ; python_version < '2.7'
+ordereddict==1.1 ; python_version < '2.7'
 jsonpickle

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 811810b..6d7a30c 100644
--- a/setup.py
+++ b/setup.py
@@ -36,10 +36,6 @@ try:
 except IOError:
     install_requires = []
 
-try:
-    import importlib
-except ImportError:
-    install_requires.append('importlib')
 
 setup(
     name=_PACKAGE_NAME,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/.pylintrc
----------------------------------------------------------------------
diff --git a/tests/.pylintrc b/tests/.pylintrc
index 0f84473..c455d8a 100644
--- a/tests/.pylintrc
+++ b/tests/.pylintrc
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 [MASTER]
 
 # Python code to execute, usually for sys.path manipulation such as
@@ -62,7 +77,7 @@ confidence=
 # --enable=similarities". If you want to run only the classes checker, but have
 # no Warning level messages displayed, use"--disable=all --enable=classes
 # --disable=W"
-disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,no-self-use,missing-docstring,attribute-defined-outside-init,too-many-locals
+disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,no-self-use,missing-docstring,attribute-defined-outside-init,redefined-outer-name,import-error,too-many-locals
 
 [REPORTS]
 
@@ -339,7 +354,7 @@ max-args=15
 ignored-argument-names=_.*
 
 # Maximum number of locals for function / method body
-max-locals=15
+max-locals=30
 
 # Maximum number of return / yield for function / method body
 max-returns=6
@@ -360,7 +375,7 @@ max-attributes=15
 min-public-methods=0
 
 # Maximum number of public methods for a class (see R0904).
-max-public-methods=20
+max-public-methods=50
 
 # Maximum number of boolean expressions in a if statement
 max-bool-expr=5

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/mock/__init__.py
----------------------------------------------------------------------
diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py
new file mode 100644
index 0000000..14541d0
--- /dev/null
+++ b/tests/mock/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from . import models, context, operations

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
new file mode 100644
index 0000000..a89612e
--- /dev/null
+++ b/tests/mock/context.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from aria import context, application_model_storage
+
+from . import models
+from ..storage import InMemoryModelDriver
+
+
+def simple():
+    storage = application_model_storage(InMemoryModelDriver())
+    storage.setup()
+    return context.workflow.WorkflowContext(
+        name='simple_context',
+        model_storage=storage,
+        resource_storage=None,
+        deployment_id=models.DEPLOYMENT_ID,
+        workflow_id=models.WORKFLOW_ID,
+        execution_id=models.EXECUTION_ID
+    )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
new file mode 100644
index 0000000..633adbb
--- /dev/null
+++ b/tests/mock/models.py
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+from aria.storage import models
+
+from . import operations
+
+DEPLOYMENT_ID = 'test_deployment_id'
+BLUEPRINT_ID = 'test_blueprint_id'
+WORKFLOW_ID = 'test_workflow_id'
+EXECUTION_ID = 'test_execution_id'
+
+
+def get_dependency_node():
+    return models.Node(
+        id='dependency_node',
+        host_id='dependency_node',
+        blueprint_id=BLUEPRINT_ID,
+        type='test_node_type',
+        type_hierarchy=[],
+        number_of_instances=1,
+        planned_number_of_instances=1,
+        deploy_number_of_instances=1,
+        properties={},
+        operations=dict((key, {}) for key in operations.NODE_OPERATIONS),
+        relationships=[],
+        min_number_of_instances=1,
+        max_number_of_instances=1,
+    )
+
+
+def get_dependency_node_instance(dependency_node=None):
+    return models.NodeInstance(
+        id='dependency_node_instance',
+        host_id='dependency_node_instance',
+        deployment_id=DEPLOYMENT_ID,
+        runtime_properties={},
+        version=None,
+        relationship_instances=[],
+        node=dependency_node or get_dependency_node()
+    )
+
+
+def get_relationship(target=None):
+    return models.Relationship(
+        target_id=target.id or get_dependency_node().id,
+        source_interfaces={},
+        source_operations=dict((key, {}) for key in 
operations.RELATIONSHIP_OPERATIONS),
+        target_interfaces={},
+        target_operations=dict((key, {}) for key in 
operations.RELATIONSHIP_OPERATIONS),
+        type='rel_type',
+        type_hierarchy=[],
+        properties={},
+    )
+
+
+def get_relationship_instance(target_instance=None, relationship=None):
+    return models.RelationshipInstance(
+        target_id=target_instance.id or get_dependency_node_instance().id,
+        target_name='test_target_name',
+        type='some_type',
+        relationship=relationship or get_relationship(target_instance.node
+                                                      if target_instance else 
None)
+    )
+
+
+def get_dependent_node(relationship=None):
+    return models.Node(
+        id='dependent_node',
+        host_id='dependent_node',
+        blueprint_id=BLUEPRINT_ID,
+        type='test_node_type',
+        type_hierarchy=[],
+        number_of_instances=1,
+        planned_number_of_instances=1,
+        deploy_number_of_instances=1,
+        properties={},
+        operations=dict((key, {}) for key in operations.NODE_OPERATIONS),
+        relationships=[relationship or get_relationship()],
+        min_number_of_instances=1,
+        max_number_of_instances=1,
+    )
+
+
+def get_dependent_node_instance(relationship_instance, dependent_node=None):
+    return models.NodeInstance(
+        id='dependent_node_instance',
+        host_id='dependent_node_instance',
+        deployment_id=DEPLOYMENT_ID,
+        runtime_properties={},
+        version=None,
+        relationship_instances=[relationship_instance or 
get_relationship_instance()],
+        node=dependent_node or get_dependency_node()
+    )
+
+
+def get_execution():
+    return models.Execution(
+        id=EXECUTION_ID,
+        status=models.Execution.STARTED,
+        deployment_id=DEPLOYMENT_ID,
+        workflow_id=WORKFLOW_ID,
+        blueprint_id=BLUEPRINT_ID,
+        started_at=datetime.now(),
+        parameters=None
+    )
+
+
+def get_deployment():
+    now = datetime.now()
+    return models.Deployment(
+        id=DEPLOYMENT_ID,
+        description=None,
+        created_at=now,
+        updated_at=now,
+        blueprint_id=BLUEPRINT_ID,
+        workflows={}
+    )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/mock/operations.py
----------------------------------------------------------------------
diff --git a/tests/mock/operations.py b/tests/mock/operations.py
new file mode 100644
index 0000000..407061f
--- /dev/null
+++ b/tests/mock/operations.py
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+NODE_OPERATIONS_INSTALL = [
+    'aria.interfaces.lifecycle.create',
+    'aria.interfaces.lifecycle.configure',
+    'aria.interfaces.lifecycle.start',
+    ]
+NODE_OPERATIONS_UNINSTALL = [
+    'aria.interfaces.lifecycle.stop',
+    'aria.interfaces.lifecycle.delete',
+]
+NODE_OPERATIONS = NODE_OPERATIONS_INSTALL + NODE_OPERATIONS_UNINSTALL
+
+RELATIONSHIP_OPERATIONS_INSTALL = [
+    'aria.interfaces.relationship_lifecycle.preconfigure',
+    'aria.interfaces.relationship_lifecycle.postconfigure',
+    'aria.interfaces.relationship_lifecycle.establish',
+    ]
+RELATIONSHIP_OPERATIONS_UNINSTALL = 
['aria.interfaces.relationship_lifecycle.unlink']
+RELATIONSHIP_OPERATIONS = RELATIONSHIP_OPERATIONS_INSTALL + 
RELATIONSHIP_OPERATIONS_UNINSTALL

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/test_logger.py
----------------------------------------------------------------------
diff --git a/tests/test_logger.py b/tests/test_logger.py
index 7891dd6..37731bb 100644
--- a/tests/test_logger.py
+++ b/tests/test_logger.py
@@ -80,7 +80,7 @@ def test_create_file_log_handler():
         assert handler.baseFilename == temp_file.name
         assert handler.maxBytes == 5 * 1000 * 1024
         assert handler.backupCount == 10
-        assert handler.delay is True
+        assert handler.stream is None
         assert handler.level == logging.DEBUG
         assert handler.formatter == _default_file_formatter
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/__init__.py
----------------------------------------------------------------------
diff --git a/tests/workflows/__init__.py b/tests/workflows/__init__.py
index ae1e83e..fe04b2f 100644
--- a/tests/workflows/__init__.py
+++ b/tests/workflows/__init__.py
@@ -12,3 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from . import api, builtin, core

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/api/__init__.py
----------------------------------------------------------------------
diff --git a/tests/workflows/api/__init__.py b/tests/workflows/api/__init__.py
new file mode 100644
index 0000000..09697dc
--- /dev/null
+++ b/tests/workflows/api/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/api/test_task.py
----------------------------------------------------------------------
diff --git a/tests/workflows/api/test_task.py b/tests/workflows/api/test_task.py
new file mode 100644
index 0000000..7119529
--- /dev/null
+++ b/tests/workflows/api/test_task.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import pytest
+
+from aria import context
+from aria.workflows import api
+
+from ... import mock
+
+
[email protected]()
+def ctx():
+    """
+    Create the following graph in storage:
+    dependency_node <------ dependent_node
+    :return:
+    """
+    simple_context = mock.context.simple()
+    dependency_node = mock.models.get_dependency_node()
+    dependency_node_instance = mock.models.get_dependency_node_instance(
+        dependency_node=dependency_node)
+
+    relationship = mock.models.get_relationship(dependency_node)
+    relationship_instance = mock.models.get_relationship_instance(
+        relationship=relationship,
+        target_instance=dependency_node_instance
+    )
+
+    dependent_node = mock.models.get_dependent_node(relationship)
+    dependent_node_instance = mock.models.get_dependent_node_instance(
+        dependent_node=dependent_node,
+        relationship_instance=relationship_instance
+    )
+
+    simple_context.model.node.store(dependent_node)
+    simple_context.model.node.store(dependency_node)
+    simple_context.model.node_instance.store(dependent_node_instance)
+    simple_context.model.node_instance.store(dependency_node_instance)
+    simple_context.model.relationship.store(relationship)
+    simple_context.model.relationship_instance.store(relationship_instance)
+    simple_context.model.execution.store(mock.models.get_execution())
+    simple_context.model.deployment.store(mock.models.get_deployment())
+
+    return simple_context
+
+
+class TestOperationTask(object):
+
+    def test_operation_task_creation(self):
+        workflow_context = mock.context.simple()
+
+        name = 'task_name'
+        op_details = {'operation_details': True}
+        node_instance = mock.models.get_dependency_node_instance()
+        inputs = {'inputs': True}
+
+        with context.workflow.current.push(workflow_context):
+            model_task = api.task.OperationTask(name=name,
+                                                operation_details=op_details,
+                                                node_instance=node_instance,
+                                                inputs=inputs)
+
+        assert model_task.name == name
+        assert model_task.operation_details == op_details
+        assert model_task.node_instance == node_instance
+        assert model_task.inputs == inputs
+
+
+class TestWorkflowTask(object):
+
+    def test_workflow_task_creation(self, ctx):
+
+        workspace = {}
+
+        mock_class = type('mock_class', (object,), {'test_attribute': True})
+
+        def sub_workflow(**kwargs):
+            workspace.update(kwargs)
+            return mock_class
+
+        with context.workflow.current.push(ctx):
+            workflow_task = api.task.WorkflowTask(sub_workflow, 
kwarg='workflow_kwarg')
+            assert workflow_task.graph is mock_class
+            assert workflow_task.test_attribute is True

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/tests/workflows/api/test_task_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/api/test_task_graph.py 
b/tests/workflows/api/test_task_graph.py
new file mode 100644
index 0000000..0ec0b0e
--- /dev/null
+++ b/tests/workflows/api/test_task_graph.py
@@ -0,0 +1,745 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.workflows.api import task_graph, task
+
+
+class MockTask(task.BaseTask):
+    def __init__(self):
+        super(MockTask, self).__init__(ctx={})
+
+
[email protected]
+def graph():
+    return task_graph.TaskGraph(name='mock-graph')
+
+
+class TestTaskGraphTasks(object):
+
+    def test_add_task(self, graph):
+        task = MockTask()
+        add_result = graph.add_tasks(task)
+        assert add_result == [task]
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_add_empty_group(self, graph):
+        result = graph.add_tasks([])
+        assert result == []
+
+    def test_add_group(self, graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+        added_tasks = graph.add_tasks(*tasks)
+        assert added_tasks == tasks
+
+    def test_add_partially_existing_group(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        tasks = [MockTask(), task, MockTask()]
+        added_tasks = graph.add_tasks(*tasks)
+        assert added_tasks == [tasks[0], tasks[2]]
+
+    def test_add_recursively_group(self, graph):
+        recursive_group = [MockTask(), MockTask()]
+        tasks = [MockTask(), recursive_group, MockTask()]
+        added_tasks = graph.add_tasks(tasks)
+        assert added_tasks == [tasks[0], recursive_group[0], 
recursive_group[1], tasks[2]]
+
+    def test_add_existing_task(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # adding a task already in graph - should have no effect, and return 
False
+        add_result = graph.add_tasks(task)
+        assert add_result == []
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_remove_task(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(other_task)
+        graph.remove_tasks(other_task)
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_remove_tasks_with_dependency(self, graph):
+        task = MockTask()
+        dependent_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependent_task)
+        graph.add_dependency(dependent_task, task)
+        remove_result = graph.remove_tasks(dependent_task)
+        assert remove_result == [dependent_task]
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+        # asserting no dependencies are left for the dependent task
+        assert len(list(graph.get_dependencies(task))) == 0
+
+    def test_remove_empty_group(self, graph):
+        result = graph.remove_tasks([])
+        assert result == []
+
+    def test_remove_group(self, graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+        graph.add_tasks(*tasks)
+        removed_tasks = graph.remove_tasks(*tasks)
+        assert removed_tasks == tasks
+
+    def test_remove_partially_existing_group(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        tasks = [MockTask(), task, MockTask()]
+        removed_tasks = graph.remove_tasks(*tasks)
+        assert removed_tasks == [task]
+
+    def test_remove_recursively_group(self, graph):
+        recursive_group = [MockTask(), MockTask()]
+        tasks = [MockTask(), recursive_group, MockTask()]
+        graph.add_tasks(tasks)
+        removed_tasks = graph.remove_tasks(tasks)
+        assert removed_tasks == [tasks[0], recursive_group[0], 
recursive_group[1], tasks[2]]
+
+    def test_remove_nonexistent_task(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        # removing a task not in graph - should have no effect, and return 
False
+        remove_result = graph.remove_tasks(task_not_in_graph)
+        assert remove_result == []
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_has_task(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        assert graph.has_tasks(task) is True
+
+    def test_has_nonexistent_task(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        assert graph.has_tasks(task_not_in_graph) is False
+
+    def test_has_empty_group(self, graph):
+        # the "empty task" is in the graph
+        assert graph.has_tasks([]) is True
+
+    def test_has_group(self, graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+        graph.add_tasks(*tasks)
+        assert graph.has_tasks(*tasks) is True
+
+    def test_has_partially_existing_group(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        tasks = [MockTask(), task, MockTask()]
+        assert graph.has_tasks(tasks) is False
+
+    def test_has_recursively_group(self, graph):
+        recursive_group = [MockTask(), MockTask()]
+        tasks = [MockTask(), recursive_group, MockTask()]
+        graph.add_tasks(tasks)
+        assert graph.has_tasks(tasks) is True
+
+    def test_get_task(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        assert graph.get_task(task.id) == task
+
+    def test_get_nonexistent_task(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.get_task(task_not_in_graph.id)
+
+
+class TestTaskGraphGraphTraversal(object):
+
+    def test_tasks_iteration(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(other_task)
+        tasks = [t for t in graph.tasks]
+        assert set(tasks) == set([task, other_task])
+
+    def test_get_dependents(self, graph):
+        task = MockTask()
+        dependent_task_1 = MockTask()
+        dependent_task_2 = MockTask()
+        transitively_dependent_task = MockTask()
+
+        graph.add_tasks(task)
+        graph.add_tasks(dependent_task_1)
+        graph.add_tasks(dependent_task_2)
+        graph.add_tasks(transitively_dependent_task)
+
+        graph.add_dependency(dependent_task_1, task)
+        graph.add_dependency(dependent_task_2, task)
+        graph.add_dependency(transitively_dependent_task, dependent_task_2)
+
+        dependent_tasks = list(graph.get_dependents(task))
+        # transitively_dependent_task not expected to appear in the result
+        assert set(dependent_tasks) == set([dependent_task_1, 
dependent_task_2])
+
+    def test_get_task_empty_dependents(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(other_task)
+        dependent_tasks = list(graph.get_dependents(task))
+        assert len(dependent_tasks) == 0
+
+    def test_get_nonexistent_task_dependents(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            list(graph.get_dependents(task_not_in_graph))
+
+    def test_get_dependencies(self, graph):
+        task = MockTask()
+        dependency_task_1 = MockTask()
+        dependency_task_2 = MockTask()
+        transitively_dependency_task = MockTask()
+
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task_1)
+        graph.add_tasks(dependency_task_2)
+        graph.add_tasks(transitively_dependency_task)
+
+        graph.add_dependency(task, dependency_task_1)
+        graph.add_dependency(task, dependency_task_2)
+        graph.add_dependency(dependency_task_2, transitively_dependency_task)
+
+        dependency_tasks = list(graph.get_dependencies(task))
+        # transitively_dependency_task not expected to appear in the result
+        assert set(dependency_tasks) == set([dependency_task_1, 
dependency_task_2])
+
+    def test_get_task_empty_dependencies(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(other_task)
+        dependency_tasks = list(graph.get_dependencies(task))
+        assert len(dependency_tasks) == 0
+
+    def test_get_nonexistent_task_dependencies(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            list(graph.get_dependencies(task_not_in_graph))
+
+
+class TestTaskGraphDependencies(object):
+
+    def test_add_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        unrelated_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task)
+        graph.add_tasks(unrelated_task)
+        graph.add_dependency(task, dependency_task)
+        add_result = graph.has_dependency(task, dependency_task)
+        assert add_result is True
+        dependency_tasks = list(graph.get_dependencies(task))
+        assert len(dependency_tasks) == 1
+        assert dependency_tasks[0] == dependency_task
+
+    def test_add_existing_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task)
+        graph.add_dependency(task, dependency_task)
+        add_result = graph.has_dependency(task, dependency_task)
+        # adding a dependency already in graph - should have no effect, and 
return False
+        assert add_result is True
+        graph.add_dependency(task, dependency_task)
+        add_result = graph.has_dependency(task, dependency_task)
+        assert add_result is True
+        dependency_tasks = list(graph.get_dependencies(task))
+        assert len(dependency_tasks) == 1
+        assert dependency_tasks[0] == dependency_task
+
+    def test_add_dependency_nonexistent_dependent(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.add_dependency(task_not_in_graph, task)
+
+    def test_add_dependency_nonexistent_dependency(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.add_dependency(task, task_not_in_graph)
+
+    def test_add_dependency_empty_dependent(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting add_dependency result to be False - no dependency has been 
created
+        assert set(graph.tasks) == set((task,))
+
+    def test_add_dependency_empty_dependency(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting add_dependency result to be False - no dependency has been 
created
+        assert set(graph.tasks) == set((task,))
+
+    def test_add_dependency_dependent_group(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        graph.add_dependency(group_tasks, task)
+        assert graph.has_dependency(group_tasks[0], task) is True
+        assert graph.has_dependency(group_tasks[1], task) is True
+        assert graph.has_dependency(group_tasks[2], task) is True
+
+    def test_add_dependency_dependency_group(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        graph.add_dependency(task, group_tasks)
+        assert graph.has_dependency(task, group_tasks[0]) is True
+        assert graph.has_dependency(task, group_tasks[1]) is True
+        assert graph.has_dependency(task, group_tasks[2]) is True
+
+    def test_add_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        graph.add_dependency(group_1_tasks, group_2_tasks)
+        for group_2_task in group_2_tasks:
+            assert graph.has_dependency(group_1_tasks[0], group_2_task) is True
+            assert graph.has_dependency(group_1_tasks[1], group_2_task) is True
+            assert graph.has_dependency(group_1_tasks[2], group_2_task) is True
+
+    def 
test_add_dependency_dependency_group_with_some_existing_dependencies(self, 
graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        # adding a dependency on a specific task manually,
+        # before adding a dependency on the whole parallel
+        graph.add_dependency(task, group_tasks[1])
+        graph.add_dependency(task, group_tasks)
+        assert graph.has_dependency(task, group_tasks[0]) is True
+        assert graph.has_dependency(task, group_tasks[1]) is True
+        assert graph.has_dependency(task, group_tasks[2]) is True
+
+    def test_add_existing_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        graph.add_dependency(group_1_tasks, group_2_tasks)
+        add_result = graph.has_dependency(group_1_tasks, group_2_tasks)
+        assert add_result is True
+        # adding a dependency already in graph - should have no effect, and 
return False
+        graph.add_dependency(group_1_tasks, group_2_tasks)
+        add_result = graph.has_dependency(group_1_tasks, group_2_tasks)
+        assert add_result is True
+        for group_2_task in group_2_tasks:
+            assert graph.has_dependency(group_1_tasks[0], group_2_task) is True
+            assert graph.has_dependency(group_1_tasks[1], group_2_task) is True
+            assert graph.has_dependency(group_1_tasks[2], group_2_task) is True
+
+    def test_has_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task)
+        graph.add_dependency(task, dependency_task)
+        assert graph.has_dependency(task, dependency_task) is True
+
+    def test_has_nonexistent_dependency(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(other_task)
+        assert graph.has_dependency(task, other_task) is False
+
+    def test_has_dependency_nonexistent_dependent(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.has_dependency(task_not_in_graph, task)
+
+    def test_has_dependency_nonexistent_dependency(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.has_dependency(task, task_not_in_graph)
+
+    def test_has_dependency_empty_dependent(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting has_dependency result to be False - dependency in an empty 
form
+        assert graph.has_dependency([], task) is False
+
+    def test_has_dependency_empty_dependency(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting has_dependency result to be True - dependency in an empty 
form
+        assert graph.has_dependency(task, []) is False
+
+    def test_has_dependency_dependent_group(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        assert graph.has_dependency(group_tasks, task) is False
+        graph.add_dependency(group_tasks, task)
+        assert graph.has_dependency(group_tasks, task) is True
+
+    def test_has_dependency_dependency_parallel(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        assert graph.has_dependency(task, group_tasks) is False
+        graph.add_dependency(task, group_tasks)
+        assert graph.has_dependency(task, group_tasks) is True
+
+    def test_has_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        assert graph.has_dependency(group_2_tasks, group_1_tasks) is False
+        graph.add_dependency(group_2_tasks, group_1_tasks)
+        assert graph.has_dependency(group_2_tasks, group_1_tasks) is True
+
+    def 
test_has_dependency_dependency_parallel_with_some_existing_dependencies(self, 
graph):
+        task = MockTask()
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        parallel = graph.add_tasks(*parallel_tasks)
+        graph.add_dependency(task, parallel_tasks[1])
+        # only a partial dependency exists - has_dependency is expected to 
return False
+        assert graph.has_dependency(task, parallel) is False
+
+    def test_has_nonexistent_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        assert graph.has_dependency(group_1_tasks, group_2_tasks) is False
+
+    def test_remove_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        another_dependent_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task)
+        graph.add_tasks(another_dependent_task)
+        graph.add_dependency(task, dependency_task)
+        graph.add_dependency(another_dependent_task, dependency_task)
+
+        graph.remove_dependency(task, dependency_task)
+        remove_result = graph.has_dependency(task, dependency_task)
+        assert remove_result is False
+        assert graph.has_dependency(task, dependency_task) is False
+        assert graph.has_dependency(another_dependent_task, dependency_task) 
is True
+
+    def test_remove_nonexistent_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task)
+        # removing a dependency not in graph - should have no effect, and 
return False
+        graph.remove_dependency(task, dependency_task)
+        remove_result = graph.has_dependency(task, dependency_task)
+        assert remove_result is False
+        tasks = [t for t in graph.tasks]
+        assert set(tasks) == set([task, dependency_task])
+
+    def test_remove_dependency_nonexistent_dependent(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.remove_dependency(task_not_in_graph, task)
+
+    def test_remove_dependency_nonexistent_dependency(self, graph):
+        # in this test the dependency *task* is not in the graph, not just the 
dependency itself
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.remove_dependency(task, task_not_in_graph)
+
+    def test_remove_dependency_empty_dependent(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting remove_dependency result to be False - no dependency has 
been created
+        graph.remove_dependency([], task)
+        assert set(graph.tasks) == set((task,))
+
+    def test_remove_dependency_empty_dependency(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting remove_dependency result to be False - no dependency has 
been created
+        graph.remove_dependency(task, [])
+        assert set(graph.tasks) == set((task,))
+
+    def test_remove_dependency_dependent_group(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        graph.add_dependency(group_tasks, task)
+        graph.remove_dependency(group_tasks, task)
+        remove_result = graph.has_dependency(group_tasks, task)
+        assert remove_result is False
+        assert graph.has_dependency(group_tasks[0], task) is False
+        assert graph.has_dependency(group_tasks[1], task) is False
+        assert graph.has_dependency(group_tasks[2], task) is False
+
+    def test_remove_dependency_dependency_group(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        graph.add_dependency(task, group_tasks)
+        graph.remove_dependency(task, group_tasks)
+        remove_result = graph.has_dependency(task, group_tasks)
+        assert remove_result is False
+        assert graph.has_dependency(task, group_tasks[0]) is False
+        assert graph.has_dependency(task, group_tasks[1]) is False
+        assert graph.has_dependency(task, group_tasks[2]) is False
+
+    def test_remove_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        graph.add_dependency(group_2_tasks, group_1_tasks)
+        graph.remove_dependency(group_2_tasks, group_1_tasks)
+        remove_result = graph.has_dependency(group_2_tasks, group_1_tasks)
+        assert remove_result is False
+        for group_2_task in group_2_tasks:
+            assert graph.has_dependency(group_2_task, group_1_tasks[0]) is 
False
+            assert graph.has_dependency(group_2_task, group_1_tasks[1]) is 
False
+            assert graph.has_dependency(group_2_task, group_1_tasks[2]) is 
False
+
+    def 
test_remove_dependency_dependency_group_with_some_existing_dependencies(self, 
graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        graph.add_dependency(task, group_tasks[1])
+        graph.remove_dependency(task, group_tasks)
+        remove_result = graph.has_dependency(task, group_tasks)
+        # only a partial dependency exists - remove_dependency is expected to 
return False
+        assert remove_result is False
+        # no dependencies are expected to have changed
+        assert graph.has_dependency(task, group_tasks[0]) is False
+        assert graph.has_dependency(task, group_tasks[1]) is True
+        assert graph.has_dependency(task, group_tasks[2]) is False
+
+    def test_remove_nonexistent_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        # removing a dependency not in graph - should have no effect, and 
return False
+        graph.remove_dependency(group_2_tasks, group_1_tasks)
+        remove_result = graph.has_dependency(group_2_tasks, group_1_tasks)
+        assert remove_result is False
+
+    # nested tests
+
+    def test_group_with_nested_sequence(self, graph):
+        all_tasks = [MockTask() for _ in xrange(5)]
+        graph.add_tasks(all_tasks[0],
+                        graph.sequence(all_tasks[1], all_tasks[2], 
all_tasks[3]),
+                        all_tasks[4])
+        assert set(graph.tasks) == set(all_tasks)
+
+        # tasks[2] and tasks[3] should each have a single dependency; the rest 
should have none
+        assert len(list(graph.get_dependencies(all_tasks[0]))) == 0
+        assert len(list(graph.get_dependencies(all_tasks[1]))) == 0
+        assert set(graph.get_dependencies(all_tasks[2])) == set([all_tasks[1]])
+        assert set(graph.get_dependencies(all_tasks[3])) == set([all_tasks[2]])
+        assert len(list(graph.get_dependencies(all_tasks[4]))) == 0
+
+    def test_group_with_nested_group(self, graph):
+        tasks = [MockTask() for _ in xrange(5)]
+        graph.add_tasks(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4])
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        # none of the tasks should have any dependencies
+        for i in xrange(len(tasks)):
+            assert len(list(graph.get_dependencies(tasks[i]))) == 0
+
+    def test_group_with_recursively_nested_group(self, graph):
+        recursively_nested_tasks = [MockTask(), MockTask(), MockTask()]
+        nested_tasks = [MockTask(), MockTask(), MockTask(), 
recursively_nested_tasks]
+        tasks = [MockTask(), MockTask(), MockTask(), nested_tasks]
+        graph.add_tasks(*tasks)
+
+        assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + 
recursively_nested_tasks)
+        for tasks_list in [tasks, nested_tasks, recursively_nested_tasks]:
+            for i in xrange(len(tasks_list[:3])):
+                assert len(list(graph.get_dependencies(tasks_list[i]))) == 0
+
+    def test_group_with_recursively_nested_group_and_interdependencies(self, 
graph):
+        recursively_nested_tasks = [MockTask(), MockTask(), MockTask()]
+        nested_tasks = [MockTask(), MockTask(), MockTask(), 
recursively_nested_tasks]
+        tasks = [MockTask(), MockTask(), MockTask(), nested_tasks]
+        graph.add_tasks(*tasks)
+
+        graph.add_dependency(tasks[2], nested_tasks[2])
+        graph.add_dependency(nested_tasks[1], recursively_nested_tasks[0])
+        graph.add_dependency(recursively_nested_tasks[1], tasks[0])
+
+        assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + 
recursively_nested_tasks)
+        assert set(graph.get_dependencies(tasks[0])) == set()
+        assert set(graph.get_dependencies(tasks[1])) == set()
+        assert set(graph.get_dependencies(tasks[2])) == set([nested_tasks[2]])
+
+        assert set(graph.get_dependencies(nested_tasks[0])) == set()
+        assert set(graph.get_dependencies(nested_tasks[1])) == 
set([recursively_nested_tasks[0]])
+        assert set(graph.get_dependencies(nested_tasks[2])) == set()
+
+        assert set(graph.get_dependencies(recursively_nested_tasks[0])) == 
set()
+        assert set(graph.get_dependencies(recursively_nested_tasks[1])) == 
set([tasks[0]])
+        assert set(graph.get_dependencies(recursively_nested_tasks[2])) == 
set()
+
+
+class TestTaskGraphSequence(object):
+
+    def test_sequence(self, graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+        graph.sequence(*tasks)
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        assert len(list(graph.get_dependencies(tasks[0]))) == 0
+        assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]])
+        assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]])
+
+    def test_sequence_with_some_tasks_and_dependencies_already_in_graph(self, 
graph):
+        # tests both that tasks which werent previously in graph get inserted, 
and
+        # that existing tasks don't get re-added to graph
+        tasks = [MockTask(), MockTask(), MockTask()]
+        # insert some tasks and dependencies to the graph
+        graph.add_tasks(tasks[1])
+        graph.add_tasks(tasks[2])
+        graph.add_dependency(tasks[2], tasks[1])
+
+        graph.sequence(*tasks)
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        assert len(list(graph.get_dependencies(tasks[0]))) == 0
+        assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]])
+        assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]])
+
+    def test_sequence_with_nested_sequence(self, graph):
+        tasks = [MockTask() for _ in xrange(5)]
+        graph.sequence(tasks[0], graph.sequence(tasks[1], tasks[2], tasks[3]), 
tasks[4])
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        # first task should have no dependencies
+        assert len(list(graph.get_dependencies(tasks[0]))) == 0
+        assert len(list(graph.get_dependencies(tasks[1]))) == 1
+        assert len(list(graph.get_dependencies(tasks[2]))) == 2
+        assert len(list(graph.get_dependencies(tasks[3]))) == 2
+        assert len(list(graph.get_dependencies(tasks[4]))) == 3
+
+    def test_sequence_with_nested_group(self, graph):
+        tasks = [MockTask() for _ in xrange(5)]
+        graph.sequence(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4])
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        # first task should have no dependencies
+        assert len(list(graph.get_dependencies(tasks[0]))) == 0
+        # rest of the tasks (except last) should have a single dependency - 
the first task
+        for i in xrange(1, 4):
+            assert set(graph.get_dependencies(tasks[i])) == set([tasks[0]])
+        # last task should have have a dependency on all tasks except for the 
first one
+        assert set(graph.get_dependencies(tasks[4])) == set([tasks[1], 
tasks[2], tasks[3]])
+
+    def test_sequence_with_recursively_nested_group(self, graph):
+        recursively_nested_group = [MockTask(), MockTask()]
+        nested_group = [MockTask(), recursively_nested_group, MockTask()]
+        sequence_tasks = [MockTask(), nested_group, MockTask()]
+
+        graph.sequence(*sequence_tasks)
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set([sequence_tasks[0], nested_group[0],
+                                        recursively_nested_group[0], 
recursively_nested_group[1],
+                                        nested_group[2], sequence_tasks[2]])
+
+        assert list(graph.get_dependencies(nested_group[0])) == 
[sequence_tasks[0]]
+        assert list(graph.get_dependencies(recursively_nested_group[0])) == 
[sequence_tasks[0]]
+        assert list(graph.get_dependencies(recursively_nested_group[1])) == 
[sequence_tasks[0]]
+        assert list(graph.get_dependencies(nested_group[2])) == 
[sequence_tasks[0]]
+
+        assert list(graph.get_dependents(nested_group[0])) == 
[sequence_tasks[2]]
+        assert list(graph.get_dependents(recursively_nested_group[0])) == 
[sequence_tasks[2]]
+        assert list(graph.get_dependents(recursively_nested_group[1])) == 
[sequence_tasks[2]]
+        assert list(graph.get_dependents(nested_group[2])) == 
[sequence_tasks[2]]
+
+    def test_sequence_with_empty_group(self, graph):
+        tasks = [MockTask(), [], MockTask()]
+        graph.sequence(*tasks)
+        graph_tasks = set([t for t in graph.tasks])
+        assert graph_tasks == set([tasks[0], tasks[2]])
+        assert list(graph.get_dependents(tasks[0])) == [tasks[2]]
+        assert list(graph.get_dependencies(tasks[2])) == [tasks[0]]
+
+    def 
test_sequence_with_recursively_nested_sequence_and_interdependencies(self, 
graph):
+        recursively_nested_tasks = list(graph.sequence(MockTask(), MockTask(), 
MockTask()))
+        nested_tasks = list(graph.sequence(MockTask(),
+                                           MockTask(),
+                                           MockTask(),
+                                           recursively_nested_tasks))
+        tasks = [MockTask(), MockTask(), MockTask(), nested_tasks]
+        graph.sequence(*tasks)
+
+        assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + 
recursively_nested_tasks)
+        assert set(graph.get_dependencies(tasks[0])) == set()
+        for i in xrange(1, len(tasks[:-1])):
+            assert set(graph.get_dependencies(tasks[i])) == set([tasks[i - 1]])
+
+        assert set(graph.get_dependencies(nested_tasks[0])) == set([tasks[2]])
+        for i in xrange(1, len(nested_tasks[:-1])):
+            assert set(graph.get_dependencies(nested_tasks[i])) == \
+                   set([tasks[2], nested_tasks[i-1]])
+
+        assert set(graph.get_dependencies(recursively_nested_tasks[0])) == \
+               set([tasks[2], nested_tasks[2]])
+        for i in xrange(1, len(recursively_nested_tasks[:-1])):
+            assert set(graph.get_dependencies(recursively_nested_tasks[i])) == 
\
+                   set([tasks[2], nested_tasks[2], 
recursively_nested_tasks[i-1]])

Reply via email to