Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-9-API-for-operation-context b71174f26 -> c3c0aa9b0
wip2 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/84d48402 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/84d48402 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/84d48402 Branch: refs/heads/ARIA-9-API-for-operation-context Commit: 84d48402ab11601c99c0fa1bce63a5576c2df1d6 Parents: b71174f Author: mxmrlv <[email protected]> Authored: Thu Nov 10 19:10:17 2016 +0200 Committer: mxmrlv <[email protected]> Committed: Thu Nov 10 19:10:17 2016 +0200 ---------------------------------------------------------------------- aria/context/__init__.py | 107 +--------------- aria/context/common.py | 121 +++++++++++++++++++ aria/context/operation.py | 7 +- aria/context/toolbelt.py | 56 +++++++++ aria/context/workflow.py | 2 +- aria/decorators.py | 11 +- aria/storage/models.py | 11 +- aria/workflows/core/task.py | 39 ++++-- aria/workflows/executor/multiprocess.py | 2 +- aria/workflows/executor/thread.py | 2 +- tests/context/__init__.py | 56 +++++++++ tests/context/operation.py | 121 +++++++++++++++++++ tests/context/toolbelt.py | 69 +++++++++++ tests/mock/context.py | 1 + tests/mock/models.py | 2 +- tests/test_context.py | 56 --------- tests/workflows/core/test_executor.py | 6 +- tests/workflows/core/test_task.py | 105 ++++++++-------- .../test_task_graph_into_exececution_graph.py | 2 +- tests/workflows/test_engine.py | 24 ++-- 20 files changed, 547 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/context/__init__.py ---------------------------------------------------------------------- diff --git a/aria/context/__init__.py b/aria/context/__init__.py index aa68a36..fa5fbda 100644 --- a/aria/context/__init__.py +++ b/aria/context/__init__.py @@ -16,109 +16,6 @@ """ Provides contexts to workflow and operation """ -from uuid import uuid4 -from .. import logger -from ..tools.lru_cache import lru_cache - - -class BaseContext(logger.LoggerMixin): - """ - Context object used during workflow creation and execution - """ - - def __init__( - self, - name, - model_storage, - resource_storage, - deployment_id, - workflow_id, - execution_id=None, - parameters=None, - **kwargs): - super(BaseContext, self).__init__(**kwargs) - self.name = name - self.id = str(uuid4()) - self.model = model_storage - self.resource = resource_storage - self.deployment_id = deployment_id - self.workflow_id = workflow_id - self.execution_id = execution_id or str(uuid4()) - self.parameters = parameters or {} - - def __repr__(self): - return ( - '{name}(deployment_id={self.deployment_id}, ' - 'workflow_id={self.workflow_id}, ' - 'execution_id={self.execution_id})'.format( - name=self.__class__.__name__, self=self)) - - @property - def blueprint_id(self): - """ - The blueprint id - """ - return self.deployment.blueprint_id - - @property - @lru_cache() - def blueprint(self): - """ - The blueprint model - """ - return self.model.blueprint.get(self.blueprint_id) - - @property - @lru_cache() - def deployment(self): - """ - The deployment model - """ - return self.model.deployment.get(self.deployment_id) - - @property - def execution(self): - """ - The execution model - """ - return self.model.execution.get(self.execution_id) - - @execution.setter - def execution(self, value): - """ - Store the execution in the model storage - """ - self.model.execution.store(value) - - def download_blueprint_resource(self, destination, path=None): - """ - Download a blueprint resource from the resource storage - """ - return self.resource.blueprint.download( - entry_id=self.blueprint_id, - destination=destination, - path=path) - - def download_deployment_resource(self, destination, path=None): - """ - Download a deployment resource from the resource storage - """ - return self.resource.deployment.download( - entry_id=self.deployment_id, - destination=destination, - path=path) - - @lru_cache() - def get_deployment_resource_data(self, path=None): - """ - Read a deployment resource as string from the resource storage - """ - return self.resource.deployment.data(entry_id=self.deployment_id, path=path) - - @lru_cache() - def get_blueprint_resource_data(self, path=None): - """ - Read a blueprint resource as string from the resource storage - """ - return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path) +from . import workflow, operation +from .toolbelt import tool_belt http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/context/common.py ---------------------------------------------------------------------- diff --git a/aria/context/common.py b/aria/context/common.py new file mode 100644 index 0000000..9f69b46 --- /dev/null +++ b/aria/context/common.py @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from uuid import uuid4 + +from .. import logger +from ..tools.lru_cache import lru_cache + + +class BaseContext(logger.LoggerMixin): + """ + Context object used during workflow creation and execution + """ + + def __init__( + self, + name, + model_storage, + resource_storage, + deployment_id, + workflow_id, + execution_id=None, + parameters=None, + **kwargs): + super(BaseContext, self).__init__(**kwargs) + self.name = name + self.id = str(uuid4()) + self.model = model_storage + self.resource = resource_storage + self.deployment_id = deployment_id + self.workflow_id = workflow_id + self.execution_id = execution_id or str(uuid4()) + self.parameters = parameters or {} + + def __repr__(self): + return ( + '{name}(deployment_id={self.deployment_id}, ' + 'workflow_id={self.workflow_id}, ' + 'execution_id={self.execution_id})'.format( + name=self.__class__.__name__, self=self)) + + @property + def blueprint_id(self): + """ + The blueprint id + """ + return self.deployment.blueprint_id + + @property + @lru_cache() + def blueprint(self): + """ + The blueprint model + """ + return self.model.blueprint.get(self.blueprint_id) + + @property + @lru_cache() + def deployment(self): + """ + The deployment model + """ + return self.model.deployment.get(self.deployment_id) + + @property + def execution(self): + """ + The execution model + """ + return self.model.execution.get(self.execution_id) + + @execution.setter + def execution(self, value): + """ + Store the execution in the model storage + """ + self.model.execution.store(value) + + def download_blueprint_resource(self, destination, path=None): + """ + Download a blueprint resource from the resource storage + """ + return self.resource.blueprint.download( + entry_id=self.blueprint_id, + destination=destination, + path=path) + + def download_deployment_resource(self, destination, path=None): + """ + Download a deployment resource from the resource storage + """ + return self.resource.deployment.download( + entry_id=self.deployment_id, + destination=destination, + path=path) + + @lru_cache() + def get_deployment_resource_data(self, path=None): + """ + Read a deployment resource as string from the resource storage + """ + return self.resource.deployment.data(entry_id=self.deployment_id, path=path) + + @lru_cache() + def get_blueprint_resource_data(self, path=None): + """ + Read a blueprint resource as string from the resource storage + """ + return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/context/operation.py b/aria/context/operation.py index 13764d8..908287d 100644 --- a/aria/context/operation.py +++ b/aria/context/operation.py @@ -18,7 +18,7 @@ Workflow and operation contexts """ -from . import BaseContext +from .common import BaseContext class BaseOperationContext(BaseContext): @@ -36,6 +36,7 @@ class BaseOperationContext(BaseContext): execution_id=workflow_context.execution_id, parameters=workflow_context.parameters, **kwargs) + self._workflow_context = workflow_context self._task_model = task_context def __repr__(self): @@ -45,6 +46,10 @@ class BaseOperationContext(BaseContext): return '{name}({0})'.format(details, name=self.name) @property + def workflow(self): + return self._workflow_context + + @property def task(self): return self._task_model http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/context/toolbelt.py b/aria/context/toolbelt.py new file mode 100644 index 0000000..59e4d3c --- /dev/null +++ b/aria/context/toolbelt.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from contextlib import contextmanager + +from . import operation + + +class _ToolBelt(object): + + def __init__(self): + self._op_context = None + self._workflow_context = None + + @property + def node_instances_connected_to_me(self): + assert isinstance(self._op_context, operation.NodeOperationContext) + for node_instance in self._workflow_context.node_instances: + for relationship_instance in node_instance.relationship_instances: + if relationship_instance.target_id == self._op_context.node_instance.id: + yield node_instance + + @property + def relationships_to_me(self): + assert isinstance(self._op_context, operation.NodeOperationContext) + for node_instance in self._workflow_context.node_instances: + for relationship_instance in node_instance.relationship_instances: + if relationship_instance.target_id == self._op_context.node_instance.id: + yield relationship_instance + + @contextmanager + def use(self, operation_context): + assert isinstance(operation_context, operation.BaseOperationContext) + self._op_context = operation_context + self._workflow_context = operation_context.workflow + try: + yield self + finally: + self._op_context = None + self._workflow_context = None + +_toolbelt = _ToolBelt() + + +tool_belt = _toolbelt.use http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/context/workflow.py b/aria/context/workflow.py index ef2feb6..0b5b2a8 100644 --- a/aria/context/workflow.py +++ b/aria/context/workflow.py @@ -22,7 +22,7 @@ from contextlib import contextmanager from aria import exceptions -from . import BaseContext +from .common import BaseContext class ContextException(exceptions.AriaError): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/decorators.py ---------------------------------------------------------------------- diff --git a/aria/decorators.py b/aria/decorators.py index e41ed81..d826c2d 100644 --- a/aria/decorators.py +++ b/aria/decorators.py @@ -50,17 +50,20 @@ def workflow(func=None, suffix_template=''): return _wrapper -def operation(func=None, suffix_template=''): +def operation(func=None, tool_belt=False, suffix_template=''): """ Operation decorator """ if func is None: - return partial(operation, suffix_template=suffix_template) + return partial(operation, suffix_template=suffix_template, tool_belt=tool_belt) @wraps(func) def _wrapper(**func_kwargs): - validate_function_arguments(func, func_kwargs) - return func(**func_kwargs) + with context.tool_belt(func_kwargs.get('ctx')) as tb: + if tool_belt: + func_kwargs.setdefault('tool_belt', tb) + validate_function_arguments(func, func_kwargs) + return func(**func_kwargs) return _wrapper http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index 686ca42..3b36818 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -263,7 +263,11 @@ class Node(Model): # todo: maybe add here Exception if isn't exists (didn't yield one's) -class RelationshipInstance(Model): +class Instance(Model): + id = Field(type=basestring, default=uuid_generator) + + +class RelationshipInstance(Instance): """ A Model which represents a relationship instance """ @@ -274,7 +278,7 @@ class RelationshipInstance(Model): relationship = PointerField(type=Relationship) -class NodeInstance(Model): +class NodeInstance(Instance): """ A Model which represents a node instance """ @@ -401,6 +405,5 @@ class Task(Model): # Operation specific fields name = Field(type=basestring) operation_details = Field(type=dict) - # TODO: this might need to be remodeled - operation_container = PointerField(type=NodeInstance) + operation_container = Field() inputs = Field(type=dict, default=lambda: {}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py index 4fb3766..075ef8a 100644 --- a/aria/workflows/core/task.py +++ b/aria/workflows/core/task.py @@ -18,6 +18,8 @@ Workflow tasks """ from contextlib import contextmanager from datetime import datetime +from functools import partial +from functools import wraps from ... import logger from ...storage import models @@ -25,6 +27,18 @@ from ...context import operation as operation_context from .. import exceptions +def _locked(func=None): + if func is None: + return partial(_locked, func=_locked) + + @wraps(func) + def _wrapper(self, value, **kwargs): + if self._update_fields is None: + raise exceptions.TaskException("Task is not in update mode") + return func(self, value, **kwargs) + return _wrapper + + class BaseTask(logger.LoggerMixin): """ Base class for Task objects @@ -97,7 +111,7 @@ class OperationTask(BaseTask): operation_container=api_task.operation_container, inputs=api_task.inputs, status=task_model.PENDING, - execution_id=self.workflow_context.execution.id, + execution_id=self.workflow_context.execution_id, max_retries=self.workflow_context.parameters.get('max_retries', 1), ) @@ -164,9 +178,8 @@ class OperationTask(BaseTask): return self.model_context.status @status.setter + @_locked def status(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") self._update_fields['status'] = value @property @@ -178,9 +191,8 @@ class OperationTask(BaseTask): return self.model_context.started_at @started_at.setter + @_locked def started_at(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") self._update_fields['started_at'] = value @property @@ -192,9 +204,8 @@ class OperationTask(BaseTask): return self.model_context.ended_at @ended_at.setter + @_locked def ended_at(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") self._update_fields['ended_at'] = value @property @@ -206,13 +217,23 @@ class OperationTask(BaseTask): return self.model_context.retry_count @retry_count.setter + @_locked def retry_count(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") self._update_fields['retry_count'] = value + @property + def eta(self): + return self.model_context.eta + + @eta.setter + @_locked + def eta(self, value): + self._update_fields['eta'] = value + def __getattr__(self, attr): try: return getattr(self.model_context, attr) except AttributeError: return super(OperationTask, self).__getattribute__(attr) + + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/workflows/executor/multiprocess.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/multiprocess.py b/aria/workflows/executor/multiprocess.py index 063f834..c6423b6 100644 --- a/aria/workflows/executor/multiprocess.py +++ b/aria/workflows/executor/multiprocess.py @@ -46,7 +46,7 @@ class MultiprocessExecutor(BaseExecutor): self._tasks[task.id] = task self._pool.apply_async(_multiprocess_handler, args=( self._queue, - task.model_context, + task.context, task.id, task.operation_details, task.inputs)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/thread.py b/aria/workflows/executor/thread.py index b2dc278..a6b616b 100644 --- a/aria/workflows/executor/thread.py +++ b/aria/workflows/executor/thread.py @@ -56,7 +56,7 @@ class ThreadExecutor(BaseExecutor): self._task_started(task) try: task_func = module.load_attribute(task.operation_details['operation']) - task_func(ctx=task.model_context, **task.inputs) + task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e: self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/context/__init__.py b/tests/context/__init__.py new file mode 100644 index 0000000..89f55cb --- /dev/null +++ b/tests/context/__init__.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import pytest +from aria.workflows.executor import blocking +from aria.workflows.core import engine + +from .. import mock + + +global_test_holder = {} + + [email protected] +def executor(): + # result = thread.ThreadExecutor() + result = blocking.CurrentThreadBlockingExecutor() + try: + yield result + finally: + result.close() + + [email protected] +def workflow_context(): + return mock.context.simple() + + +def op_path(func, module_path=None): + module_path = module_path or sys.modules[__name__].__name__ + return '{0}.{1}'.format(module_path, func.__name__) + + +def execute(workflow_func, workflow_context, executor): + graph = workflow_func(ctx=workflow_context) + eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph) + eng.execute() + + [email protected](autouse=True) +def cleanup(): + global_test_holder.clear() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/context/operation.py ---------------------------------------------------------------------- diff --git a/tests/context/operation.py b/tests/context/operation.py new file mode 100644 index 0000000..7422ce6 --- /dev/null +++ b/tests/context/operation.py @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from aria import workflow, operation +from aria.workflows import api + +from .. import mock +from . import op_path, execute, executor, workflow_context, global_test_holder + + +def test_node_operation_task_execution(workflow_context, executor): + node = mock.models.get_dependency_node() + node_instance = mock.models.get_dependency_node_instance(node) + workflow_context.model.node.store(node) + workflow_context.model.node_instance.store(node_instance) + + node_instance = workflow_context.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + name = 'op_name' + operation_details = {'operation': op_path(foo, module_path=sys.modules[__name__].__name__)} + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + name=name, + operation_details=operation_details, + operation_container=node_instance, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + operation_value = global_test_holder[name] + + # operation container based attributes + for key, value in node_instance.fields_dict.items(): + assert getattr(operation_value.operation_container, key) == value + + # Task bases assertions + assert operation_value.task.operation_container == node_instance + assert operation_value.task.name == name + assert operation_value.task.operation_details == operation_details + assert operation_value.task.inputs == inputs + + # Context based attributes (sugaring) + assert operation_value.node == node_instance.node + assert operation_value.node_instance == node_instance + + +def test_relationship_operation_task_execution(workflow_context, executor): + dependency_node = mock.models.get_dependency_node() + dependency_node_instance = mock.models.get_dependency_node_instance() + relationship = mock.models.get_relationship(dependency_node) + relationship_instance = mock.models.get_relationship_instance(dependency_node_instance, + relationship) + dependent_node = mock.models.get_dependent_node() + dependent_node_instance = mock.models.get_dependent_node_instance(relationship_instance, + dependency_node) + workflow_context.model.node.store(dependency_node) + workflow_context.model.node_instance.store(dependency_node_instance) + workflow_context.model.relationship.store(relationship) + workflow_context.model.relationship_instance.store(relationship_instance) + workflow_context.model.node.store(dependent_node) + workflow_context.model.node_instance.store(dependent_node_instance) + + name = 'op_name' + operation_details = {'operation': op_path(foo, module_path=sys.modules[__name__].__name__)} + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + name=name, + operation_details=operation_details, + operation_container=relationship_instance, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + operation_value = global_test_holder[name] + + # operation container based attributes + for key, value in relationship_instance.fields_dict.items(): + assert getattr(operation_value.operation_container, key) == value + + # Task bases assertions + assert operation_value.task.operation_container == relationship_instance + assert operation_value.task.name == name + assert operation_value.task.operation_details == operation_details + assert operation_value.task.inputs == inputs + + # Context based attributes (sugaring) + assert operation_value.target_node == dependency_node + assert operation_value.target_node_instance == dependency_node_instance + assert operation_value.relationship == relationship + assert operation_value.relationship_instance == relationship_instance + + +@operation +def foo(ctx, **_): + global_test_holder[ctx.name] = ctx + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/tests/context/toolbelt.py b/tests/context/toolbelt.py new file mode 100644 index 0000000..b46fcad --- /dev/null +++ b/tests/context/toolbelt.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from aria.workflows import api +from aria import workflow, operation + +from .. import mock +from . import op_path, execute, executor, workflow_context, global_test_holder + + +def test_operation_tool_belt(workflow_context, executor): + + dependency_node = mock.models.get_dependency_node() + dependency_node_instance = mock.models.get_dependency_node_instance() + relationship = mock.models.get_relationship(dependency_node) + relationship_instance = mock.models.get_relationship_instance(dependency_node_instance, + relationship) + dependent_node = mock.models.get_dependent_node() + dependent_node_instance = mock.models.get_dependent_node_instance(relationship_instance, + dependency_node) + workflow_context.model.node.store(dependency_node) + workflow_context.model.node_instance.store(dependency_node_instance) + workflow_context.model.relationship.store(relationship) + workflow_context.model.relationship_instance.store(relationship_instance) + workflow_context.model.node.store(dependent_node) + workflow_context.model.node_instance.store(dependent_node_instance) + + node_instance = workflow_context.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + name = 'op_name' + operation_details = {'operation': op_path(foo, module_path=sys.modules[__name__].__name__)} + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + name=name, + operation_details=operation_details, + operation_container=node_instance, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + assert list(global_test_holder.get('connected_to_me', [])) == list([dependent_node_instance]) + assert list(global_test_holder.get('relationships_to_me', [])) == list([relationship_instance]) + + +@operation(tool_belt=True) +def foo(ctx, tool_belt, **_): + global_test_holder['connected_to_me'] = list(tool_belt.node_instances_connected_to_me) + global_test_holder['relationships_to_me'] = list(tool_belt.relationships_to_me) + global_test_holder[ctx.name] = tool_belt + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index a89612e..4b1f34b 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -22,6 +22,7 @@ from ..storage import InMemoryModelDriver def simple(): storage = application_model_storage(InMemoryModelDriver()) storage.setup() + storage.deployment.store(models.get_deployment()) return context.workflow.WorkflowContext( name='simple_context', model_storage=storage, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index bf40039..66a902d 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -62,7 +62,7 @@ def get_dependency_node_instance(dependency_node=None): def get_relationship(target=None): return models.Relationship( - target_id=target.id or get_dependency_node().id, + target_id=target.id if target is not None else get_dependency_node().id, source_interfaces={}, source_operations=dict((key, {}) for key in operations.RELATIONSHIP_OPERATIONS), target_interfaces={}, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/test_context.py ---------------------------------------------------------------------- diff --git a/tests/test_context.py b/tests/test_context.py deleted file mode 100644 index b61c5c6..0000000 --- a/tests/test_context.py +++ /dev/null @@ -1,56 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -from aria.context import workflow, operation - -from . import mock - - [email protected] -def ctx(): - simple_context = mock.context.simple() - - blueprint = mock.models.get_blueprint() - deployment = mock.models.get_deployment() - node = mock.models.get_dependency_node() - node_instance = mock.models.get_dependency_node_instance(node) - execution = mock.models.get_execution() - - simple_context.model.blueprint.store(blueprint) - simple_context.model.deployment.store(deployment) - simple_context.model.node.store(node) - simple_context.model.node_instance.store(node_instance) - simple_context.model.execution.store(execution) - - return simple_context - - -class TestOperationContext(object): - - def test_node_operation(self, ctx): - node = ctx.model.node.get(mock.models.DEPENDENCY_NODE_ID) - node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) - with workflow.current.push(ctx): - operation_context = operation.NodeOperationContext( - name='test_operation_name', - operation_details={'dits': True}, - operation_container=node_instance, - inputs={'inputs': True}, - ) - # operation host attributes - assert operation_context.node == node - assert operation_context.node_instance == node_instance \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/workflows/core/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_executor.py b/tests/workflows/core/test_executor.py index 3445347..91213e5 100644 --- a/tests/workflows/core/test_executor.py +++ b/tests/workflows/core/test_executor.py @@ -81,15 +81,15 @@ class TestExecutor(object): self.executor.close() -def mock_successful_task(ctx): +def mock_successful_task(**_): pass -def mock_failing_task(ctx): +def mock_failing_task(**_): raise MockException -def mock_task_with_input(ctx, input): +def mock_task_with_input(input, **_): raise MockException(input) if app: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task.py b/tests/workflows/core/test_task.py index 7731165..451bce3 100644 --- a/tests/workflows/core/test_task.py +++ b/tests/workflows/core/test_task.py @@ -12,24 +12,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import pytest +from datetime import datetime, timedelta -from aria import decorators from aria.context import workflow as workflow_context from aria.workflows import ( api, core, + exceptions, ) -from aria.workflows.executor import blocking -from aria.storage import models from ... import mock -global_test_holder = {} - - @pytest.fixture def ctx(): simple_context = mock.context.simple() @@ -49,16 +44,9 @@ def ctx(): return simple_context [email protected](autouse=True) -def clear_global_test_operation_holder(): - global_test_holder.clear() - - class TestOperationTask(object): - def test_operation_task_creation(self, ctx): - node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) - + def _create_operation_task(self, ctx, node_instance): with workflow_context.current.push(ctx): api_task = api.task.OperationTask( name='ripe', @@ -68,50 +56,55 @@ class TestOperationTask(object): core_task = core.task.OperationTask(api_task=api_task) - storage_task = ctx.model.task.get(core_task.id) + return api_task, core_task - assert core_task.model_context == storage_task - assert core_task.name == api_task.name - assert core_task.operation_details == api_task.operation_details - assert core_task.operation_container == api_task.operation_container == node_instance - assert core_task.inputs == api_task.inputs == storage_task.inputs - - def test_operation_task_execution(self, ctx): + def test_operation_task_creation(self, ctx): node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + api_task, core_task = self._create_operation_task(ctx, node_instance) + storage_task = ctx.model.task.get(core_task.id) - operation_name = 'silly_operation' - operation_details = {'operation': 'tests.workflows.core.test_task.foo'} - inputs = {'putput': True} - - @decorators.workflow - def basic_workflow(graph, **_): - graph.add_tasks( - api.task.OperationTask( - name=operation_name, - operation_details=operation_details, - operation_container=node_instance, - inputs=inputs, - ) - ) - - engine = core.engine.Engine(executor=blocking.CurrentThreadBlockingExecutor(), - workflow_context=ctx, - tasks_graph=basic_workflow(ctx=ctx)) - engine.execute() - operation_value = global_test_holder[operation_name] + assert core_task.model_context == storage_task + assert core_task.name == api_task.name + assert core_task.operation_details == api_task.operation_details + assert core_task.operation_container == api_task.operation_container == node_instance + assert core_task.inputs == api_task.inputs == storage_task.inputs - assert operation_value.node_instance == node_instance - assert operation_value.node == node_instance.node - # operation_host attributes - for key, value in node_instance.fields_dict.items(): - assert getattr(operation_value.operation_container, key) == value - - assert operation_value.task.operation_container == node_instance - assert operation_value.task.name == operation_name - assert operation_value.task.operation_details == operation_details - assert operation_value.task.inputs == inputs + def test_operation_task_edit_locked_attribute(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) [email protected] -def foo(ctx, **_): - global_test_holder[ctx.name] = ctx + _, core_task = self._create_operation_task(ctx, node_instance) + now = datetime.utcnow() + with pytest.raises(exceptions.TaskException): + core_task.status = core_task.STARTED + with pytest.raises(exceptions.TaskException): + core_task.started_at = now + with pytest.raises(exceptions.TaskException): + core_task.ended_at = now + with pytest.raises(exceptions.TaskException): + core_task.retry_count = 2 + with pytest.raises(exceptions.TaskException): + core_task.eta = now + + def test_operation_task_edit_attributes(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + _, core_task = self._create_operation_task(ctx, node_instance) + future_time = datetime.utcnow() + timedelta(seconds=3) + + with core_task.update(): + core_task.status = core_task.STARTED + core_task.started_at = future_time + core_task.ended_at = future_time + core_task.retry_count = 2 + core_task.eta = future_time + assert core_task.status != core_task.STARTED + assert core_task.started_at != future_time + assert core_task.ended_at != future_time + assert core_task.retry_count != 2 + assert core_task.eta != future_time + + assert core_task.status == core_task.STARTED + assert core_task.started_at == future_time + assert core_task.ended_at == future_time + assert core_task.retry_count == 2 + assert core_task.eta == future_time http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/workflows/core/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py b/tests/workflows/core/test_task_graph_into_exececution_graph.py index 992b0cc..170fbdb 100644 --- a/tests/workflows/core/test_task_graph_into_exececution_graph.py +++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py @@ -93,7 +93,7 @@ def _assert_execution_is_api_task(execution_task, api_task): assert execution_task.id == api_task.id assert execution_task.name == api_task.name assert execution_task.operation_details == api_task.operation_details - assert execution_task.operation_host == api_task.operation_host + assert execution_task.operation_container == api_task.operation_container assert execution_task.inputs == api_task.inputs http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/workflows/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py index f016d12..72c70e5 100644 --- a/tests/workflows/test_engine.py +++ b/tests/workflows/test_engine.py @@ -19,11 +19,10 @@ import pytest import aria from aria import events -from aria import workflow +from aria import workflow, operation from aria import context from aria.storage import models from aria.workflows import exceptions -from aria.workflows.executor import thread from aria.workflows.core import engine from aria.workflows import api @@ -95,11 +94,11 @@ class TestEngine(object): eng.execute() @staticmethod - def _op(func, ctx, inputs=None): + def _op(func, ctx, inputs=None, task_name=None): return api.task.OperationTask( - name='task', - operation_details={'operation': 'tests.workflows.test_engine.{name}'.format( - name=func.__name__)}, + name=task_name or 'task', + operation_details={'operation': 'tests.workflows.test_engine.{name}'. + format(name=func.__name__)}, operation_container=ctx.model.node_instance.get('dependency_node_instance'), inputs=inputs ) @@ -141,7 +140,9 @@ class TestEngine(object): @pytest.fixture(scope='function') def executor(self): - result = thread.ThreadExecutor() + from aria.workflows.executor import blocking + result = blocking.CurrentThreadBlockingExecutor() + # result = thread.ThreadExecutor() try: yield result finally: @@ -174,14 +175,17 @@ class TestEngine(object): return result -def mock_success_task(): +@operation +def mock_success_task(**_): pass -def mock_failed_task(): +@operation +def mock_failed_task(**_): raise RuntimeError -def mock_ordered_task(counter): +@operation +def mock_ordered_task(counter, **_): invocations = global_test_holder.setdefault('invocations', []) invocations.append(counter)
