Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-9-API-for-operation-context c3c0aa9b0 -> 03a9de706 (forced update)
ARIATOSCA-9-API-for-operation-context(wip) Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/03a9de70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/03a9de70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/03a9de70 Branch: refs/heads/ARIA-9-API-for-operation-context Commit: 03a9de70606ee69409557839f9fe225cab015756 Parents: 8947f72 Author: mxmrlv <[email protected]> Authored: Thu Oct 27 20:39:09 2016 +0300 Committer: mxmrlv <[email protected]> Committed: Sun Nov 13 18:34:00 2016 +0200 ---------------------------------------------------------------------- aria/context/__init__.py | 1 + aria/context/common.py | 123 ++++++++++++++++++ aria/context/operation.py | 91 ++++++++----- aria/context/toolbelt.py | 73 +++++++++++ aria/context/workflow.py | 110 +--------------- aria/decorators.py | 39 ++---- aria/events/builtin_event_handler.py | 8 +- aria/storage/models.py | 13 +- aria/storage/structures.py | 2 +- aria/workflows/api/task.py | 13 +- aria/workflows/builtin/heal.py | 8 +- aria/workflows/builtin/workflows.py | 65 ++++------ aria/workflows/core/__init__.py | 2 +- aria/workflows/core/task.py | 89 +++++++++---- aria/workflows/executor/__init__.py | 2 + aria/workflows/executor/blocking.py | 2 +- aria/workflows/executor/celery.py | 1 + aria/workflows/executor/multiprocess.py | 5 +- aria/workflows/executor/thread.py | 2 +- tests/context/__init__.py | 56 ++++++++ tests/context/test_operation.py | 129 +++++++++++++++++++ tests/context/test_toolbelt.py | 129 +++++++++++++++++++ tests/mock/context.py | 2 + tests/mock/models.py | 48 +++++-- tests/storage/test_models.py | 3 + tests/workflows/api/test_task.py | 4 +- tests/workflows/core/test_executor.py | 9 +- tests/workflows/core/test_task.py | 110 ++++++++++++++++ .../test_task_graph_into_exececution_graph.py | 6 +- tests/workflows/test_engine.py | 36 +++--- 30 files changed, 886 insertions(+), 295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/context/__init__.py ---------------------------------------------------------------------- diff --git a/aria/context/__init__.py b/aria/context/__init__.py index 20e19db..ad89b13 100644 --- a/aria/context/__init__.py +++ b/aria/context/__init__.py @@ -18,3 +18,4 @@ Provides contexts to workflow and operation """ from . import workflow, operation +from .toolbelt import toolbelt http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/context/common.py ---------------------------------------------------------------------- diff --git a/aria/context/common.py b/aria/context/common.py new file mode 100644 index 0000000..79360a3 --- /dev/null +++ b/aria/context/common.py @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from uuid import uuid4 + +from .. import logger +from ..tools.lru_cache import lru_cache + + +class BaseContext(logger.LoggerMixin): + """ + Context object used during workflow creation and execution + """ + + def __init__( + self, + name, + model_storage, + resource_storage, + deployment_id, + workflow_id, + execution_id=None, + parameters=None, + **kwargs): + super(BaseContext, self).__init__(**kwargs) + self.name = name + self.id = str(uuid4()) + self._model = model_storage + self._resource = resource_storage + self._deployment_id = deployment_id + self._workflow_id = workflow_id + self._execution_id = execution_id or str(uuid4()) + self.parameters = parameters or {} + + def __repr__(self): + return ( + '{name}(name={self.name}, ' + 'deployment_id={self._deployment_id}, ' + 'workflow_id={self._workflow_id}, ' + 'execution_id={self._execution_id})' + .format(name=self.__class__.__name__, self=self)) + + @property + def model(self): + return self._model + + @property + def resource(self): + return self._resource + + @property + @lru_cache() + def blueprint(self): + """ + The blueprint model + """ + return self.model.blueprint.get(self.deployment.blueprint_id) + + @property + @lru_cache() + def deployment(self): + """ + The deployment model + """ + return self.model.deployment.get(self._deployment_id) + + @property + def execution(self): + """ + The execution model + """ + return self.model.execution.get(self._execution_id) + + @execution.setter + def execution(self, value): + """ + Store the execution in the model storage + """ + self.model.execution.store(value) + + def download_blueprint_resource(self, destination, path=None): + """ + Download a blueprint resource from the resource storage + """ + return self.resource.blueprint.download( + entry_id=self.blueprint.id, + destination=destination, + path=path) + + def download_deployment_resource(self, destination, path=None): + """ + Download a deployment resource from the resource storage + """ + return self.resource.deployment.download( + entry_id=self._deployment_id, + destination=destination, + path=path) + + @lru_cache() + def get_deployment_resource_data(self, path=None): + """ + Read a deployment resource as string from the resource storage + """ + return self.resource.deployment.data(entry_id=self._deployment_id, path=path) + + @lru_cache() + def get_blueprint_resource_data(self, path=None): + """ + Read a blueprint resource as string from the resource storage + """ + return self.resource.blueprint.data(entry_id=self._deployment_id, path=path) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/context/operation.py b/aria/context/operation.py index d4d229a..9ffcab1 100644 --- a/aria/context/operation.py +++ b/aria/context/operation.py @@ -17,52 +17,73 @@ Workflow and operation contexts """ -from uuid import uuid4 -from aria.logger import LoggerMixin +from .common import BaseContext -class OperationContext(LoggerMixin): + +class BaseOperationContext(BaseContext): """ Context object used during operation creation and execution """ - def __init__( - self, - name, - operation_details, - workflow_context, - node_instance, - inputs=None): - super(OperationContext, self).__init__() - self.name = name - self.id = str(uuid4()) - self.operation_details = operation_details - self.workflow_context = workflow_context - self.node_instance = node_instance - self.inputs = inputs or {} + def __init__(self, name, workflow_context, task, **kwargs): + super(BaseOperationContext, self).__init__( + name=name, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + deployment_id=workflow_context._deployment_id, + workflow_id=workflow_context._workflow_id, + execution_id=workflow_context._execution_id, + parameters=workflow_context.parameters, + **kwargs) + self._workflow_context = workflow_context + self._task_model = task + self._operation_executor = self.task.operation_executor def __repr__(self): details = ', '.join( '{0}={1}'.format(key, value) - for key, value in self.operation_details.items()) + for key, value in self.task.operation_details.items()) return '{name}({0})'.format(details, name=self.name) - def __getattr__(self, attr): - try: - return getattr(self.workflow_context, attr) - except AttributeError: - return super(OperationContext, self).__getattribute__(attr) + @property + def task(self): + return self._task_model + + +class NodeOperationContext(BaseOperationContext): + + @property + def node(self): + return self._operation_executor.node + + @property + def node_instance(self): + return self._operation_executor + + +class RelationshipOperationContext(BaseOperationContext): + + @property + def node(self): + return self.model.node.get(self.relationship.source_id) + + @property + def node_instance(self): + return self.model.node_instance.get(self.relationship_instance.source_id) + + @property + def target_node(self): + return self.model.node.get(self.relationship.target_id) + + @property + def target_node_instance(self): + return self.model.node_instance.get(self._operation_executor.target_id) + + @property + def relationship(self): + return self._operation_executor.relationship @property - def operation(self): - """ - The model operation - """ - return self.storage.operation.get(self.id) - - @operation.setter - def operation(self, value): - """ - Store the operation in the model storage - """ - self.storage.operation.store(value) + def relationship_instance(self): + return self._operation_executor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/context/toolbelt.py b/aria/context/toolbelt.py new file mode 100644 index 0000000..0ab1b7a --- /dev/null +++ b/aria/context/toolbelt.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from contextlib import contextmanager + +from .. import exceptions +from . import operation + + +class _Toolbelt(object): + + def __init__(self): + self._op_context = None + self._workflow_context = None + + @contextmanager + def use(self, operation_context): + assert isinstance(operation_context, operation.BaseOperationContext) + _op_context = self._op_context + _workflow_context = self._workflow_context + + self._op_context = operation_context + self._workflow_context = operation_context._workflow_context + try: + yield self + finally: + self._op_context = _op_context + self._workflow_context = _workflow_context + + +class _OperationToolBelt(_Toolbelt): + + @property + def relationships_to_me(self): + assert isinstance(self._op_context, operation.NodeOperationContext) + for node_instance in self._workflow_context.node_instances: + for relationship_instance in node_instance.relationship_instances: + if relationship_instance.target_id == self._op_context.node_instance.id: + yield relationship_instance + + @property + def host_ip(self): + assert isinstance(self._op_context, operation.NodeOperationContext) + host_id = self._op_context._operation_executor.host_id + host_instance = self._workflow_context.model.node_instance.get(host_id) + return host_instance.runtime_properties.get('ip') + + +class _RelationshipToolBelt(_Toolbelt): + pass + +_operation_toolbelt = _OperationToolBelt() +_relationship_toolbelt = _RelationshipToolBelt() + + +def toolbelt(operation_context): + if isinstance(operation_context, operation.NodeOperationContext): + return _operation_toolbelt.use(operation_context) + elif isinstance(operation_context, operation.RelationshipOperationContext): + return _relationship_toolbelt.use(operation_context) + else: + raise exceptions.TaskException("Operation context not supported") http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/context/workflow.py b/aria/context/workflow.py index 8183d42..9b2027e 100644 --- a/aria/context/workflow.py +++ b/aria/context/workflow.py @@ -18,12 +18,11 @@ Workflow and operation contexts """ import threading -from uuid import uuid4 from contextlib import contextmanager -from .. import logger -from ..tools.lru_cache import lru_cache -from .. import exceptions +from aria import exceptions + +from .common import BaseContext class ContextException(exceptions.AriaError): @@ -33,121 +32,24 @@ class ContextException(exceptions.AriaError): pass -class WorkflowContext(logger.LoggerMixin): +class WorkflowContext(BaseContext): """ Context object used during workflow creation and execution """ - def __init__( - self, - name, - model_storage, - resource_storage, - deployment_id, - workflow_id, - execution_id=None, - parameters=None, - **kwargs): - super(WorkflowContext, self).__init__(**kwargs) - self.name = name - self.id = str(uuid4()) - self.model = model_storage - self.resource = resource_storage - self.deployment_id = deployment_id - self.workflow_id = workflow_id - self.execution_id = execution_id or str(uuid4()) - self.parameters = parameters or {} - - def __repr__(self): - return ( - '{name}(deployment_id={self.deployment_id}, ' - 'workflow_id={self.workflow_id}, ' - 'execution_id={self.execution_id})'.format( - name=self.__class__.__name__, self=self)) - - @property - def blueprint_id(self): - """ - The blueprint id - """ - return self.deployment.blueprint_id - - @property - @lru_cache() - def blueprint(self): - """ - The blueprint model - """ - return self.model.blueprint.get(self.blueprint_id) - - @property - @lru_cache() - def deployment(self): - """ - The deployment model - """ - return self.model.deployment.get(self.deployment_id) - @property def nodes(self): """ Iterator over nodes """ - return self.model.node.iter( - filters={'blueprint_id': self.blueprint_id}) + return self.model.node.iter(filters={'blueprint_id': self.blueprint.id}) @property def node_instances(self): """ Iterator over node instances """ - return self.model.node_instance.iter(filters={'deployment_id': self.deployment_id}) - - @property - def execution(self): - """ - The execution model - """ - return self.model.execution.get(self.execution_id) - - @execution.setter - def execution(self, value): - """ - Store the execution in the model storage - """ - self.model.execution.store(value) - - def download_blueprint_resource(self, destination, path=None): - """ - Download a blueprint resource from the resource storage - """ - return self.resource.blueprint.download( - entry_id=self.blueprint_id, - destination=destination, - path=path) - - def download_deployment_resource(self, destination, path=None): - """ - Download a deployment resource from the resource storage - """ - return self.resource.deployment.download( - entry_id=self.deployment_id, - destination=destination, - path=path) - - @lru_cache() - def get_deployment_resource_data(self, path=None): - """ - Read a deployment resource as string from the resource storage - """ - return self.resource.deployment.data(entry_id=self.deployment_id, path=path) - - @lru_cache() - def get_blueprint_resource_data(self, path=None): - """ - Read a blueprint resource as string from the resource storage - """ - return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path) + return self.model.node_instance.iter(filters={'deployment_id': self.deployment.id}) class _CurrentContext(threading.local): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/decorators.py ---------------------------------------------------------------------- diff --git a/aria/decorators.py b/aria/decorators.py index a07e2ee..219f3c4 100644 --- a/aria/decorators.py +++ b/aria/decorators.py @@ -25,23 +25,17 @@ from .workflows.api import task_graph from .tools.validation import validate_function_arguments -def workflow( - func=None, - simple_workflow=True, - suffix_template=''): +def workflow(func=None, suffix_template=''): """ Workflow decorator """ if func is None: - return partial( - workflow, - simple_workflow=simple_workflow, - suffix_template=suffix_template) + return partial(workflow, suffix_template=suffix_template) @wraps(func) def _wrapper(ctx, **workflow_parameters): - workflow_name = _generate_workflow_name( + workflow_name = _generate_name( func_name=func.__name__, suffix_template=suffix_template, ctx=ctx, @@ -56,34 +50,25 @@ def workflow( return _wrapper -def operation( - func=None): +def operation(func=None, toolbelt=False, suffix_template=''): """ Operation decorator """ if func is None: - return partial(operation) + return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt) @wraps(func) - def _wrapper(ctx, **custom_kwargs): - func_kwargs = _create_func_kwargs( - custom_kwargs, - ctx) - validate_function_arguments(func, func_kwargs) - ctx.description = func.__doc__ - return func(**func_kwargs) + def _wrapper(**func_kwargs): + with context.toolbelt(func_kwargs.get('ctx')) as tb: + if toolbelt: + func_kwargs.setdefault('toolbelt', tb) + validate_function_arguments(func, func_kwargs) + return func(**func_kwargs) return _wrapper -def _generate_workflow_name(func_name, ctx, suffix_template, **custom_kwargs): +def _generate_name(func_name, ctx, suffix_template, **custom_kwargs): return '{func_name}.{suffix}'.format( func_name=func_name, suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or str(uuid4())) - -def _create_func_kwargs( - kwargs, - ctx, - workflow_name=None): - kwargs.setdefault('graph', ctx.task_graph(workflow_name)) - return kwargs http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py index 2abdd9f..cfcb185 100644 --- a/aria/events/builtin_event_handler.py +++ b/aria/events/builtin_event_handler.py @@ -65,10 +65,10 @@ def _task_succeeded(task, *args, **kwargs): def _workflow_started(workflow_context, *args, **kwargs): execution_cls = workflow_context.model.execution.model_cls execution = execution_cls( - id=workflow_context.execution_id, - deployment_id=workflow_context.deployment_id, - workflow_id=workflow_context.workflow_id, - blueprint_id=workflow_context.blueprint_id, + id=workflow_context._execution_id, + deployment_id=workflow_context.deployment.id, + workflow_id=workflow_context._workflow_id, + blueprint_id=workflow_context.blueprint.id, status=execution_cls.PENDING, started_at=datetime.utcnow(), parameters=workflow_context.parameters, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index 9aa7cf0..42272e1 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -221,6 +221,7 @@ class Relationship(Model): A Model which represents a relationship """ id = Field(type=basestring, default=uuid_generator) + source_id = Field(type=basestring) target_id = Field(type=basestring) source_interfaces = Field(type=dict) source_operations = Field(type=dict) @@ -263,18 +264,24 @@ class Node(Model): # todo: maybe add here Exception if isn't exists (didn't yield one's) -class RelationshipInstance(Model): +class Instance(Model): + id = Field(type=basestring, default=uuid_generator) + + +class RelationshipInstance(Instance): """ A Model which represents a relationship instance """ id = Field(type=basestring, default=uuid_generator) target_id = Field(type=basestring) target_name = Field(type=basestring) + source_id = Field(type=basestring) + source_name = Field(type=basestring) type = Field(type=basestring) relationship = PointerField(type=Relationship) -class NodeInstance(Model): +class NodeInstance(Instance): """ A Model which represents a node instance """ @@ -401,5 +408,5 @@ class Task(Model): # Operation specific fields name = Field(type=basestring) operation_details = Field(type=dict) - node_instance = PointerField(type=NodeInstance) + operation_executor = Field() inputs = Field(type=dict, default=lambda: {}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/storage/structures.py ---------------------------------------------------------------------- diff --git a/aria/storage/structures.py b/aria/storage/structures.py index ea4cf3a..9ba63d8 100644 --- a/aria/storage/structures.py +++ b/aria/storage/structures.py @@ -26,7 +26,7 @@ classes: * IterPointerField - represents an iterable pointers field. * Model - abstract model implementation. """ - +import collections import json from uuid import uuid4 from itertools import count http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py index 1070d99..31906ba 100644 --- a/aria/workflows/api/task.py +++ b/aria/workflows/api/task.py @@ -18,7 +18,10 @@ Provides the tasks to be enterd into the task graph """ from uuid import uuid4 -from ... import context +from ... import ( + context, + storage, +) class BaseTask(object): @@ -57,19 +60,21 @@ class OperationTask(BaseTask): def __init__(self, name, operation_details, - node_instance, + operation_executor, inputs=None): """ Creates an operation task using the name, details, node instance and any additional kwargs. :param name: the operation of the name. :param operation_details: the details for the operation. - :param node_instance: the node instance on which this operation is registered. + :param operation_executor: the operation host on which this operation is registered. :param inputs: operation inputs. """ + assert isinstance(operation_executor, (storage.models.NodeInstance, + storage.models.RelationshipInstance)) super(OperationTask, self).__init__() self.name = name self.operation_details = operation_details - self.node_instance = node_instance + self.operation_executor = operation_executor self.inputs = inputs or {} http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/heal.py b/aria/workflows/builtin/heal.py index dc320dc..7174de3 100644 --- a/aria/workflows/builtin/heal.py +++ b/aria/workflows/builtin/heal.py @@ -99,12 +99,10 @@ def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances): if target_node_instance in failing_node_instances: dependency = relationship_tasks( - graph=graph, node_instance=node_instance, relationship_instance=relationship_instance, - context=ctx, operation_name='aria.interfaces.relationship_lifecycle.unlink') - + graph.add_tasks(*dependency) graph.add_dependency(node_instance_sub_workflow, dependency) @@ -154,12 +152,10 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): if target_node_instance in failing_node_instances: dependent = relationship_tasks( - graph=graph, node_instance=node_instance, relationship_instance=relationship_instance, - context=ctx, operation_name='aria.interfaces.relationship_lifecycle.establish') - + graph.add_tasks(*dependent) graph.add_dependency(dependent, node_instance_sub_workflow) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/workflows.py b/aria/workflows/builtin/workflows.py index fc54f75..ee866f0 100644 --- a/aria/workflows/builtin/workflows.py +++ b/aria/workflows/builtin/workflows.py @@ -34,7 +34,7 @@ __all__ = ( # Install node instance workflow and sub workflows @workflow(suffix_template='{node_instance.id}') -def install_node_instance(ctx, graph, node_instance): +def install_node_instance(graph, node_instance, **kwargs): """ A workflow which installs a node instance. :param WorkflowContext ctx: the workflow context @@ -45,32 +45,32 @@ def install_node_instance(ctx, graph, node_instance): create_node_instance = task.OperationTask( name='aria.interfaces.lifecycle.create.{0}'.format(node_instance.id), operation_details=node_instance.node.operations['aria.interfaces.lifecycle.create'], - node_instance=node_instance + operation_executor=node_instance ) configure_node_instance = task.OperationTask( name='aria.interfaces.lifecycle.configure.{0}'.format(node_instance.id), operation_details=node_instance.node.operations['aria.interfaces.lifecycle.configure'], - node_instance=node_instance + operation_executor=node_instance ) start_node_instance = task.OperationTask( name='aria.interfaces.lifecycle.start.{0}'.format(node_instance.id), operation_details=node_instance.node.operations['aria.interfaces.lifecycle.start'], - node_instance=node_instance + operation_executor=node_instance ) graph.sequence( create_node_instance, - preconfigure_relationship(graph, ctx, node_instance), + preconfigure_relationship(graph, node_instance), configure_node_instance, - postconfigure_relationship(graph, ctx, node_instance), + postconfigure_relationship(graph, node_instance), start_node_instance, - establish_relationship(graph, ctx, node_instance) + establish_relationship(graph, node_instance) ) return graph -def preconfigure_relationship(graph, ctx, node_instance): +def preconfigure_relationship(graph, node_instance, **kwargs): """ :param context: @@ -81,11 +81,10 @@ def preconfigure_relationship(graph, ctx, node_instance): return relationships_tasks( graph=graph, operation_name='aria.interfaces.relationship_lifecycle.preconfigure', - context=ctx, node_instance=node_instance) -def postconfigure_relationship(graph, ctx, node_instance): +def postconfigure_relationship(graph, node_instance, **kwargs): """ :param context: @@ -96,11 +95,10 @@ def postconfigure_relationship(graph, ctx, node_instance): return relationships_tasks( graph=graph, operation_name='aria.interfaces.relationship_lifecycle.postconfigure', - context=ctx, node_instance=node_instance) -def establish_relationship(graph, ctx, node_instance): +def establish_relationship(graph, node_instance, **kwargs): """ :param context: @@ -111,14 +109,13 @@ def establish_relationship(graph, ctx, node_instance): return relationships_tasks( graph=graph, operation_name='aria.interfaces.relationship_lifecycle.establish', - context=ctx, node_instance=node_instance) # Uninstall node instance workflow and subworkflows @workflow(suffix_template='{node_instance.id}') -def uninstall_node_instance(ctx, graph, node_instance): +def uninstall_node_instance(graph, node_instance, **kwargs): """ A workflow which uninstalls a node instance. :param WorkflowContext context: the workflow context @@ -129,22 +126,22 @@ def uninstall_node_instance(ctx, graph, node_instance): stop_node_instance = task.OperationTask( name='aria.interfaces.lifecycle.stop.{0}'.format(node_instance.id), operation_details=node_instance.node.operations['aria.interfaces.lifecycle.stop'], - node_instance=node_instance + operation_executor=node_instance, ) delete_node_instance = task.OperationTask( name='aria.interfaces.lifecycle.delete.{0}'.format(node_instance.id), operation_details=node_instance.node.operations['aria.interfaces.lifecycle.delete'], - node_instance=node_instance + operation_executor=node_instance ) graph.sequence( stop_node_instance, - unlink_relationship(graph, ctx, node_instance), + unlink_relationship(graph, node_instance), delete_node_instance ) -def unlink_relationship(graph, ctx, node_instance): +def unlink_relationship(graph, node_instance): """ :param context: @@ -155,7 +152,6 @@ def unlink_relationship(graph, ctx, node_instance): return relationships_tasks( graph=graph, operation_name='aria.interfaces.relationship_lifecycle.unlink', - context=ctx, node_instance=node_instance ) @@ -167,8 +163,6 @@ def execute_operation_on_instance( allow_kwargs_override): """ A workflow which executes a single operation - :param WorkflowContext context: the workflow to execute. - :param TaskGraph graph: the tasks graph of which to edit :param node_instance: the node instance to install :param basestring operation: the operation name :param dict operation_kwargs: @@ -186,15 +180,11 @@ def execute_operation_on_instance( return task.OperationTask( name=task_name, operation_details=node_instance.node.operations[operation], - node_instance=node_instance, + operation_executor=node_instance, inputs=operation_kwargs) - -def relationships_tasks(graph, - operation_name, - context, - node_instance): +def relationships_tasks(graph, operation_name, node_instance): """ Creates a relationship task (source and target) for all of a node_instance relationships. :param basestring operation_name: the relationship operation name. @@ -210,10 +200,8 @@ def relationships_tasks(graph, for index, (_, relationship_group) in enumerate(relationships_groups): for relationship_instance in relationship_group: relationship_operations = relationship_tasks( - graph=graph, node_instance=node_instance, relationship_instance=relationship_instance, - context=context, operation_name=operation_name, index=index) sub_tasks.append(relationship_operations) @@ -221,12 +209,7 @@ def relationships_tasks(graph, return graph.sequence(*sub_tasks) -def relationship_tasks(graph, - node_instance, - relationship_instance, - context, - operation_name, - index=None): +def relationship_tasks(node_instance, relationship_instance, operation_name, index=None): """ Creates a relationship task source and target. :param NodeInstance node_instance: the node instance of the relationship @@ -245,12 +228,10 @@ def relationship_tasks(graph, ) source_operation = task.OperationTask( name=operation_name_template.format('source'), - node_instance=node_instance, - operation_details=relationship_instance.relationship.source_operations[ - operation_name]) + operation_executor=relationship_instance, + operation_details=relationship_instance.relationship.source_operations[operation_name]) target_operation = task.OperationTask( name=operation_name_template.format('target'), - node_instance=context.model.node_instance.get(relationship_instance.target_id), - operation_details=relationship_instance.relationship.target_operations[ - operation_name]) - return graph.add_tasks(source_operation, target_operation) + operation_executor=relationship_instance, + operation_details=relationship_instance.relationship.target_operations[operation_name]) + return source_operation, target_operation http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/workflows/core/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/__init__.py b/aria/workflows/core/__init__.py index 646f44a..e377153 100644 --- a/aria/workflows/core/__init__.py +++ b/aria/workflows/core/__init__.py @@ -17,4 +17,4 @@ Core for the workflow execution mechanism """ -from . import task +from . import task, translation, engine http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py index fc72b59..d46c37f 100644 --- a/aria/workflows/core/task.py +++ b/aria/workflows/core/task.py @@ -18,12 +18,27 @@ Workflow tasks """ from contextlib import contextmanager from datetime import datetime +from functools import partial +from functools import wraps from ... import logger from ...storage import models +from ...context import operation as operation_context from .. import exceptions +def _locked(func=None): + if func is None: + return partial(_locked, func=_locked) + + @wraps(func) + def _wrapper(self, value, **kwargs): + if self._update_fields is None: + raise exceptions.TaskException("Task is not in update mode") + return func(self, value, **kwargs) + return _wrapper + + class BaseTask(logger.LoggerMixin): """ Base class for Task objects @@ -80,26 +95,39 @@ class EndSubWorkflowTask(BaseWorkflowTask): pass -class OperationTask(BaseTask, logger.LoggerMixin): +class OperationTask(BaseTask): """ Operation tasks """ - def __init__(self, api_task, *args, **kwargs): + def __init__(self, api_task, **kwargs): super(OperationTask, self).__init__(id=api_task.id, **kwargs) self._workflow_ctx = api_task.workflow_context task_model = api_task.workflow_context.model.task.model_cls - task = task_model( + operation_task = task_model( + id=api_task.id, name=api_task.name, operation_details=api_task.operation_details, - node_instance=api_task.node_instance, + operation_executor=api_task.operation_executor, inputs=api_task.inputs, status=task_model.PENDING, - execution_id=self.workflow_context.execution_id, + execution_id=self.workflow_context._execution_id, max_retries=self.workflow_context.parameters.get('max_retries', 1), ) - self.workflow_context.model.task.store(task) - self._task_id = task.id + + if isinstance(api_task.operation_executor, models.NodeInstance): + context_class = operation_context.NodeOperationContext + elif isinstance(api_task.operation_executor, models.RelationshipInstance): + context_class = operation_context.RelationshipOperationContext + else: + context_class = None + + self._ctx = context_class(name=api_task.name, + workflow_context=self.workflow_context, + task=operation_task) + + self.workflow_context.model.task.store(operation_task) + self._task_id = operation_task.id self._update_fields = None @contextmanager @@ -111,10 +139,10 @@ class OperationTask(BaseTask, logger.LoggerMixin): self._update_fields = {} try: yield - task = self.context + task = self.model_context for key, value in self._update_fields.items(): setattr(task, key, value) - self.context = task + self.model_context = task finally: self._update_fields = None @@ -126,29 +154,32 @@ class OperationTask(BaseTask, logger.LoggerMixin): return self._workflow_ctx @property - def context(self): + def model_context(self): """ Returns the task model in storage :return: task in storage """ return self.workflow_context.model.task.get(self._task_id) - @context.setter - def context(self, value): + @model_context.setter + def model_context(self, value): self.workflow_context.model.task.store(value) @property + def context(self): + return self._ctx + + @property def status(self): """ Returns the task status :return: task status """ - return self.context.status + return self.model_context.status @status.setter + @_locked def status(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") self._update_fields['status'] = value @property @@ -157,12 +188,11 @@ class OperationTask(BaseTask, logger.LoggerMixin): Returns when the task started :return: when task started """ - return self.context.started_at + return self.model_context.started_at @started_at.setter + @_locked def started_at(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") self._update_fields['started_at'] = value @property @@ -171,12 +201,11 @@ class OperationTask(BaseTask, logger.LoggerMixin): Returns when the task ended :return: when task ended """ - return self.context.ended_at + return self.model_context.ended_at @ended_at.setter + @_locked def ended_at(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") self._update_fields['ended_at'] = value @property @@ -185,16 +214,26 @@ class OperationTask(BaseTask, logger.LoggerMixin): Returns the retry count for the task :return: retry count """ - return self.context.retry_count + return self.model_context.retry_count @retry_count.setter + @_locked def retry_count(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") self._update_fields['retry_count'] = value + @property + def eta(self): + return self.model_context.eta + + @eta.setter + @_locked + def eta(self, value): + self._update_fields['eta'] = value + def __getattr__(self, attr): try: - return getattr(self.context, attr) + return getattr(self.model_context, attr) except AttributeError: return super(OperationTask, self).__getattribute__(attr) + + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/__init__.py b/aria/workflows/executor/__init__.py index ae1e83e..352b33f 100644 --- a/aria/workflows/executor/__init__.py +++ b/aria/workflows/executor/__init__.py @@ -12,3 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from . import blocking, celery, multiprocess, thread \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/workflows/executor/blocking.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/blocking.py b/aria/workflows/executor/blocking.py index f072d8a..1647264 100644 --- a/aria/workflows/executor/blocking.py +++ b/aria/workflows/executor/blocking.py @@ -30,7 +30,7 @@ class CurrentThreadBlockingExecutor(BaseExecutor): self._task_started(task) try: task_func = module.load_attribute(task.operation_details['operation']) - task_func(**task.inputs) + task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e: self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/celery.py b/aria/workflows/executor/celery.py index a82a6b7..53ef41b 100644 --- a/aria/workflows/executor/celery.py +++ b/aria/workflows/executor/celery.py @@ -46,6 +46,7 @@ class CeleryExecutor(BaseExecutor): self._tasks[task.id] = task self._results[task.id] = self._app.send_task( task.operation_details['operation'], + ctx=task.model_context, kwargs=task.inputs, task_id=task.id, queue=self._get_queue(task)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/workflows/executor/multiprocess.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/multiprocess.py b/aria/workflows/executor/multiprocess.py index 4af08c0..c6423b6 100644 --- a/aria/workflows/executor/multiprocess.py +++ b/aria/workflows/executor/multiprocess.py @@ -46,6 +46,7 @@ class MultiprocessExecutor(BaseExecutor): self._tasks[task.id] = task self._pool.apply_async(_multiprocess_handler, args=( self._queue, + task.context, task.id, task.operation_details, task.inputs)) @@ -86,11 +87,11 @@ class _MultiprocessMessage(object): self.exception = exception -def _multiprocess_handler(queue, task_id, operation_details, operation_inputs): +def _multiprocess_handler(queue, ctx, task_id, operation_details, operation_inputs): queue.put(_MultiprocessMessage(type='task_started', task_id=task_id)) try: task_func = module.load_attribute(operation_details['operation']) - task_func(**operation_inputs) + task_func(ctx=ctx, **operation_inputs) queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id)) except BaseException as e: queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/aria/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/thread.py b/aria/workflows/executor/thread.py index 180c482..a6b616b 100644 --- a/aria/workflows/executor/thread.py +++ b/aria/workflows/executor/thread.py @@ -56,7 +56,7 @@ class ThreadExecutor(BaseExecutor): self._task_started(task) try: task_func = module.load_attribute(task.operation_details['operation']) - task_func(**task.inputs) + task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e: self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/tests/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/context/__init__.py b/tests/context/__init__.py new file mode 100644 index 0000000..c9e296f --- /dev/null +++ b/tests/context/__init__.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import pytest +from aria.workflows.executor import blocking +from aria.workflows.core import engine + +from .. import mock + + +global_test_holder = {} + + [email protected] +def executor(): + # result = thread.ThreadExecutor() + result = blocking.CurrentThreadBlockingExecutor() + try: + yield result + finally: + result.close() + + [email protected] +def workflow_context(): + return mock.context.simple() + + +def op_path(func, module_path=None): + module_path = module_path or sys.modules[__name__].__name__ + return '{0}.{1}'.format(module_path, func.__name__) + + +def execute(workflow_func, workflow_context, executor): + graph = workflow_func(ctx=workflow_context) + eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph) + eng.execute() + + [email protected](autouse=True) +def cleanup(): + global_test_holder.clear() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/tests/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/context/test_operation.py b/tests/context/test_operation.py new file mode 100644 index 0000000..a1fd2e0 --- /dev/null +++ b/tests/context/test_operation.py @@ -0,0 +1,129 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from aria import workflow, operation, context +from aria.workflows import api + +from .. import mock +from . import op_path, execute, executor, workflow_context, global_test_holder + + +def test_node_operation_task_execution(workflow_context, executor): + node = mock.models.get_dependency_node() + node_instance = mock.models.get_dependency_node_instance(node) + workflow_context.model.node.store(node) + workflow_context.model.node_instance.store(node_instance) + + node_instance = workflow_context.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + name = 'op_name' + operation_details = {'operation': op_path(my_operation, module_path=sys.modules[__name__].__name__)} + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + name=name, + operation_details=operation_details, + operation_executor=node_instance, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + operation_value = global_test_holder[name] + + assert isinstance(operation_value, context.operation.NodeOperationContext) + + # operation container based attributes + for key, value in node_instance.fields_dict.items(): + assert getattr(operation_value._operation_executor, key) == value + + # Task bases assertions + assert operation_value.task.operation_executor == node_instance + assert operation_value.task.name == name + assert operation_value.task.operation_details == operation_details + assert operation_value.task.inputs == inputs + + # Context based attributes (sugaring) + assert operation_value.node == node_instance.node + assert operation_value.node_instance == node_instance + + +def test_relationship_operation_task_execution(workflow_context, executor): + dependency_node = mock.models.get_dependency_node() + dependency_node_instance = mock.models.get_dependency_node_instance() + relationship = mock.models.get_relationship(target=dependency_node) + relationship_instance = mock.models.get_relationship_instance( + target_instance=dependency_node_instance, + relationship=relationship) + dependent_node = mock.models.get_dependent_node() + dependent_node_instance = mock.models.get_dependent_node_instance( + relationship_instance=relationship_instance, + dependent_node=dependency_node) + workflow_context.model.node.store(dependency_node) + workflow_context.model.node_instance.store(dependency_node_instance) + workflow_context.model.relationship.store(relationship) + workflow_context.model.relationship_instance.store(relationship_instance) + workflow_context.model.node.store(dependent_node) + workflow_context.model.node_instance.store(dependent_node_instance) + + name = 'op_name' + operation_details = {'operation': op_path(my_operation, module_path=sys.modules[__name__].__name__)} + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + name=name, + operation_details=operation_details, + operation_executor=relationship_instance, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + operation_value = global_test_holder[name] + + assert isinstance(operation_value, context.operation.RelationshipOperationContext) + + # operation container based attributes + for key, value in relationship_instance.fields_dict.items(): + assert getattr(operation_value._operation_executor, key) == value + + # Task bases assertions + assert operation_value.task.operation_executor == relationship_instance + assert operation_value.task.name == name + assert operation_value.task.operation_details == operation_details + assert operation_value.task.inputs == inputs + + # Context based attributes (sugaring) + assert operation_value.target_node == dependency_node + assert operation_value.target_node_instance == dependency_node_instance + assert operation_value.relationship == relationship + assert operation_value.relationship_instance == relationship_instance + assert operation_value.node == dependent_node + assert operation_value.node_instance == dependent_node_instance + + +@operation +def my_operation(ctx, **_): + global_test_holder[ctx.name] = ctx + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/tests/context/test_toolbelt.py ---------------------------------------------------------------------- diff --git a/tests/context/test_toolbelt.py b/tests/context/test_toolbelt.py new file mode 100644 index 0000000..c9340d2 --- /dev/null +++ b/tests/context/test_toolbelt.py @@ -0,0 +1,129 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import pytest + +from aria.workflows import api +from aria import workflow, operation, context, exceptions +from aria.context.toolbelt import _RelationshipToolBelt, _OperationToolBelt + +from .. import mock +from . import ( + op_path, + execute, + executor, + workflow_context, + global_test_holder, + cleanup +) + + +def test_operation_tool_belt(workflow_context, executor): + + dependency_node = mock.models.get_dependency_node() + dependency_node_instance = mock.models.get_dependency_node_instance() + relationship = mock.models.get_relationship(target=dependency_node) + relationship_instance = mock.models.get_relationship_instance( + target_instance=dependency_node_instance, relationship=relationship) + dependent_node = mock.models.get_dependent_node() + dependent_node_instance = mock.models.get_dependent_node_instance( + relationship_instance=relationship_instance, dependent_node=dependency_node) + workflow_context.model.node.store(dependency_node) + workflow_context.model.node_instance.store(dependency_node_instance) + workflow_context.model.relationship.store(relationship) + workflow_context.model.relationship_instance.store(relationship_instance) + workflow_context.model.node.store(dependent_node) + workflow_context.model.node_instance.store(dependent_node_instance) + + name = 'op_name' + operation_details = {'operation': op_path(node_operation, + module_path=sys.modules[__name__].__name__)} + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + name=name, + operation_details=operation_details, + operation_executor=dependency_node_instance, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + assert list(global_test_holder.get('relationships_to_me', [])) == list([relationship_instance]) + assert global_test_holder.get('ip') == dependency_node_instance.runtime_properties.get('ip') + + +def test_relationship_tool_belt(workflow_context, executor): + dependency_node = mock.models.get_dependency_node() + dependency_node_instance = \ + mock.models.get_dependency_node_instance(dependency_node=dependency_node) + relationship = mock.models.get_relationship(target=dependency_node) + relationship_instance = \ + mock.models.get_relationship_instance(target_instance=dependency_node_instance, + relationship=relationship) + dependent_node = mock.models.get_dependent_node(relationship=relationship) + dependent_node_instance = \ + mock.models.get_dependent_node_instance(relationship_instance=relationship_instance, + dependent_node=dependent_node) + workflow_context.model.node.store(dependency_node) + workflow_context.model.node_instance.store(dependency_node_instance) + workflow_context.model.relationship.store(relationship) + workflow_context.model.relationship_instance.store(relationship_instance) + workflow_context.model.node.store(dependent_node) + workflow_context.model.node_instance.store(dependent_node_instance) + + name = 'op_name' + operation_details = {'operation': op_path(relationship_operation, + module_path=sys.modules[__name__].__name__)} + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + name=name, + operation_details=operation_details, + operation_executor=relationship_instance, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + assert isinstance(global_test_holder.get(name), _RelationshipToolBelt) + + +def test_wrong_model_toolbelt(): + with pytest.raises(exceptions.TaskException): + context.toolbelt(None) + + +@operation(toolbelt=True) +def node_operation(toolbelt, **_): + global_test_holder['relationships_to_me'] = list(toolbelt.relationships_to_me) + global_test_holder['ip'] = toolbelt.host_ip + + +@operation(toolbelt=True) +def relationship_operation(ctx, toolbelt, **_): + global_test_holder[ctx.name] = toolbelt + + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index a89612e..257cb3a 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -22,6 +22,8 @@ from ..storage import InMemoryModelDriver def simple(): storage = application_model_storage(InMemoryModelDriver()) storage.setup() + storage.blueprint.store(models.get_blueprint()) + storage.deployment.store(models.get_deployment()) return context.workflow.WorkflowContext( name='simple_context', model_storage=storage, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index 633adbb..b4c9d2e 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -24,11 +24,16 @@ BLUEPRINT_ID = 'test_blueprint_id' WORKFLOW_ID = 'test_workflow_id' EXECUTION_ID = 'test_execution_id' +DEPENDENCY_NODE_ID = 'dependency_node' +DEPENDENCY_NODE_INSTANCE_ID = 'dependency_node_instance' +DEPENDENT_NODE_ID = 'dependent_node' +DEPENDENT_NODE_INSTANCE_ID = 'dependent_node_instance' + def get_dependency_node(): return models.Node( - id='dependency_node', - host_id='dependency_node', + id=DEPENDENCY_NODE_ID, + host_id=DEPENDENCY_NODE_ID, blueprint_id=BLUEPRINT_ID, type='test_node_type', type_hierarchy=[], @@ -45,19 +50,20 @@ def get_dependency_node(): def get_dependency_node_instance(dependency_node=None): return models.NodeInstance( - id='dependency_node_instance', - host_id='dependency_node_instance', + id=DEPENDENCY_NODE_INSTANCE_ID, + host_id=DEPENDENCY_NODE_INSTANCE_ID, deployment_id=DEPLOYMENT_ID, - runtime_properties={}, + runtime_properties={'ip': '1.1.1.1'}, version=None, relationship_instances=[], node=dependency_node or get_dependency_node() ) -def get_relationship(target=None): +def get_relationship(source=None, target=None): return models.Relationship( - target_id=target.id or get_dependency_node().id, + source_id=source.id if source is not None else DEPENDENT_NODE_ID, + target_id=target.id if target is not None else DEPENDENCY_NODE_ID, source_interfaces={}, source_operations=dict((key, {}) for key in operations.RELATIONSHIP_OPERATIONS), target_interfaces={}, @@ -68,10 +74,12 @@ def get_relationship(target=None): ) -def get_relationship_instance(target_instance=None, relationship=None): +def get_relationship_instance(source_instance=None, target_instance=None, relationship=None): return models.RelationshipInstance( - target_id=target_instance.id or get_dependency_node_instance().id, + target_id=target_instance.id if target_instance else DEPENDENCY_NODE_INSTANCE_ID, target_name='test_target_name', + source_id=source_instance.id if source_instance else DEPENDENT_NODE_INSTANCE_ID, + source_name='test_source_name', type='some_type', relationship=relationship or get_relationship(target_instance.node if target_instance else None) @@ -80,8 +88,8 @@ def get_relationship_instance(target_instance=None, relationship=None): def get_dependent_node(relationship=None): return models.Node( - id='dependent_node', - host_id='dependent_node', + id=DEPENDENT_NODE_ID, + host_id=DEPENDENT_NODE_ID, blueprint_id=BLUEPRINT_ID, type='test_node_type', type_hierarchy=[], @@ -96,10 +104,10 @@ def get_dependent_node(relationship=None): ) -def get_dependent_node_instance(relationship_instance, dependent_node=None): +def get_dependent_node_instance(relationship_instance=None, dependent_node=None): return models.NodeInstance( - id='dependent_node_instance', - host_id='dependent_node_instance', + id=DEPENDENT_NODE_INSTANCE_ID, + host_id=DEPENDENT_NODE_INSTANCE_ID, deployment_id=DEPLOYMENT_ID, runtime_properties={}, version=None, @@ -108,6 +116,18 @@ def get_dependent_node_instance(relationship_instance, dependent_node=None): ) +def get_blueprint(): + now = datetime.now() + return models.Blueprint( + plan={}, + id=BLUEPRINT_ID, + description=None, + created_at=now, + updated_at=now, + main_file_name='main_file_name' + ) + + def get_execution(): return models.Execution( id=EXECUTION_ID, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/tests/storage/test_models.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py index fa7333b..69d8ce3 100644 --- a/tests/storage/test_models.py +++ b/tests/storage/test_models.py @@ -191,6 +191,7 @@ def _relationship(id=''): return Relationship( id='rel{0}'.format(id), target_id='target{0}'.format(id), + source_id='source{0}'.format(id), source_interfaces={}, source_operations={}, target_interfaces={}, @@ -244,6 +245,8 @@ def test_relationship_instance(): relationship_instances = [RelationshipInstance( id='rel{0}'.format(index), target_id='target_{0}'.format(index % 2), + source_id='source_{0}'.format(index % 2), + source_name='', target_name='', relationship=relationship, type='type{0}'.format(index)) for index in xrange(3)] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/tests/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/test_task.py b/tests/workflows/api/test_task.py index 7119529..ad534ed 100644 --- a/tests/workflows/api/test_task.py +++ b/tests/workflows/api/test_task.py @@ -71,12 +71,12 @@ class TestOperationTask(object): with context.workflow.current.push(workflow_context): model_task = api.task.OperationTask(name=name, operation_details=op_details, - node_instance=node_instance, + operation_executor=node_instance, inputs=inputs) assert model_task.name == name assert model_task.operation_details == op_details - assert model_task.node_instance == node_instance + assert model_task.operation_executor == node_instance assert model_task.inputs == inputs http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/tests/workflows/core/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_executor.py b/tests/workflows/core/test_executor.py index 8ec0303..91213e5 100644 --- a/tests/workflows/core/test_executor.py +++ b/tests/workflows/core/test_executor.py @@ -81,15 +81,15 @@ class TestExecutor(object): self.executor.close() -def mock_successful_task(): +def mock_successful_task(**_): pass -def mock_failing_task(): +def mock_failing_task(**_): raise MockException -def mock_task_with_input(input): +def mock_task_with_input(input, **_): raise MockException(input) if app: @@ -104,7 +104,7 @@ class MockException(Exception): class MockTask(object): - def __init__(self, func, inputs=None): + def __init__(self, func, inputs=None, ctx=None): self.states = [] self.exception = None self.id = str(uuid.uuid4()) @@ -114,6 +114,7 @@ class MockTask(object): self.logger = logging.getLogger() self.name = name self.inputs = inputs or {} + self.context = ctx or None for state in models.Task.STATES: setattr(self, state.upper(), state) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/tests/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task.py b/tests/workflows/core/test_task.py new file mode 100644 index 0000000..569ee70 --- /dev/null +++ b/tests/workflows/core/test_task.py @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest +from datetime import datetime, timedelta + +from aria.context import workflow as workflow_context +from aria.workflows import ( + api, + core, + exceptions, +) + +from ... import mock + + [email protected] +def ctx(): + simple_context = mock.context.simple() + + blueprint = mock.models.get_blueprint() + deployment = mock.models.get_deployment() + node = mock.models.get_dependency_node() + node_instance = mock.models.get_dependency_node_instance(node) + execution = mock.models.get_execution() + + simple_context.model.blueprint.store(blueprint) + simple_context.model.deployment.store(deployment) + simple_context.model.node.store(node) + simple_context.model.node_instance.store(node_instance) + simple_context.model.execution.store(execution) + + return simple_context + + +class TestOperationTask(object): + + def _create_operation_task(self, ctx, node_instance): + with workflow_context.current.push(ctx): + api_task = api.task.OperationTask( + name='ripe', + operation_details={'operations': 'aria.tests.workflows.core.test_task.foo'}, + operation_executor=node_instance, + ) + + core_task = core.task.OperationTask(api_task=api_task) + + return api_task, core_task + + def test_operation_task_creation(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + api_task, core_task = self._create_operation_task(ctx, node_instance) + storage_task = ctx.model.task.get(core_task.id) + + assert core_task.model_context == storage_task + assert core_task.name == api_task.name + assert core_task.operation_details == api_task.operation_details + assert core_task.operation_executor == api_task.operation_executor == node_instance + assert core_task.inputs == api_task.inputs == storage_task.inputs + + def test_operation_task_edit_locked_attribute(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + + _, core_task = self._create_operation_task(ctx, node_instance) + now = datetime.utcnow() + with pytest.raises(exceptions.TaskException): + core_task.status = core_task.STARTED + with pytest.raises(exceptions.TaskException): + core_task.started_at = now + with pytest.raises(exceptions.TaskException): + core_task.ended_at = now + with pytest.raises(exceptions.TaskException): + core_task.retry_count = 2 + with pytest.raises(exceptions.TaskException): + core_task.eta = now + + def test_operation_task_edit_attributes(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + + _, core_task = self._create_operation_task(ctx, node_instance) + future_time = datetime.utcnow() + timedelta(seconds=3) + + with core_task.update(): + core_task.status = core_task.STARTED + core_task.started_at = future_time + core_task.ended_at = future_time + core_task.retry_count = 2 + core_task.eta = future_time + assert core_task.status != core_task.STARTED + assert core_task.started_at != future_time + assert core_task.ended_at != future_time + assert core_task.retry_count != 2 + assert core_task.eta != future_time + + assert core_task.status == core_task.STARTED + assert core_task.started_at == future_time + assert core_task.ended_at == future_time + assert core_task.retry_count == 2 + assert core_task.eta == future_time http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/tests/workflows/core/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py b/tests/workflows/core/test_task_graph_into_exececution_graph.py index 75e825f..5a5b711 100644 --- a/tests/workflows/core/test_task_graph_into_exececution_graph.py +++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py @@ -25,8 +25,12 @@ def test_task_graph_into_execution_graph(): task_context = mock.context.simple() node = mock.models.get_dependency_node() node_instance = mock.models.get_dependency_node_instance() + deployment = mock.models.get_deployment() + execution = mock.models.get_execution() task_context.model.node.store(node) task_context.model.node_instance.store(node_instance) + task_context.model.deployment.store(deployment) + task_context.model.execution.store(execution) def sub_workflow(name, **_): return api.task_graph.TaskGraph(name) @@ -89,7 +93,7 @@ def _assert_execution_is_api_task(execution_task, api_task): assert execution_task.id == api_task.id assert execution_task.name == api_task.name assert execution_task.operation_details == api_task.operation_details - assert execution_task.node_instance == api_task.node_instance + assert execution_task.operation_executor == api_task.operation_executor assert execution_task.inputs == api_task.inputs http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03a9de70/tests/workflows/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py index ea703f5..f09c363 100644 --- a/tests/workflows/test_engine.py +++ b/tests/workflows/test_engine.py @@ -19,11 +19,10 @@ import pytest import aria from aria import events -from aria import workflow +from aria import workflow, operation from aria import context from aria.storage import models from aria.workflows import exceptions -from aria.workflows.executor import thread from aria.workflows.core import engine from aria.workflows import api @@ -95,12 +94,12 @@ class TestEngine(object): eng.execute() @staticmethod - def _op(func, ctx, inputs=None): + def _op(func, ctx, inputs=None, task_name=None): return api.task.OperationTask( - name='task', - operation_details={'operation': 'tests.workflows.test_engine.{name}'.format( - name=func.__name__)}, - node_instance=ctx.model.node_instance.get('dependency_node_instance'), + name=task_name or 'task', + operation_details={'operation': 'tests.workflows.test_engine.{name}'. + format(name=func.__name__)}, + operation_executor=ctx.model.node_instance.get('dependency_node_instance'), inputs=inputs ) @@ -141,7 +140,9 @@ class TestEngine(object): @pytest.fixture(scope='function') def executor(self): - result = thread.ThreadExecutor() + from aria.workflows.executor import blocking + result = blocking.CurrentThreadBlockingExecutor() + # result = thread.ThreadExecutor() try: yield result finally: @@ -151,13 +152,9 @@ class TestEngine(object): def workflow_context(self): model_storage = aria.application_model_storage(tests.storage.InMemoryModelDriver()) model_storage.setup() - deployment = models.Deployment( - id='d1', - blueprint_id='b1', - description=None, - created_at=datetime.now(), - updated_at=datetime.now(), - workflows={}) + blueprint = mock.models.get_blueprint() + deployment = mock.models.get_deployment() + model_storage.blueprint.store(blueprint) model_storage.deployment.store(deployment) node = mock.models.get_dependency_node() node_instance = mock.models.get_dependency_node_instance(node) @@ -174,14 +171,17 @@ class TestEngine(object): return result -def mock_success_task(): +@operation +def mock_success_task(**_): pass -def mock_failed_task(): +@operation +def mock_failed_task(**_): raise RuntimeError -def mock_ordered_task(counter): +@operation +def mock_ordered_task(counter, **_): invocations = global_test_holder.setdefault('invocations', []) invocations.append(counter)
