ARIA-3 Create an api for building workflows ARIA-4 Create an API for the task graph ARIA-5 Adapt workflow API uses to modified API
An API for creating workflows. Users can build graphs of tasks and set depenedencies in between tasks to execute them in a specific order. Additional changes: - Remodeling for engine and user tasks. - Remodeling for Operation into Task in the storage. - Mminimal reorganization of a few test modules, so they are now using the same file system hierarchy as the modules which they test. Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8947f72c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8947f72c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8947f72c Branch: refs/heads/master Commit: 8947f72cb744a9b9fe79c80ebd2f2ef7fbb68251 Parents: 47eaf04 Author: Ran Ziv <[email protected]> Authored: Fri Oct 21 13:21:25 2016 +0300 Committer: mxmrlv <[email protected]> Committed: Mon Nov 7 18:07:43 2016 +0200 ---------------------------------------------------------------------- .travis.yml | 1 + aria/.pylintrc | 17 +- aria/__init__.py | 2 +- aria/cli/commands.py | 2 +- aria/context/__init__.py | 20 + aria/context/operation.py | 68 ++ aria/context/workflow.py | 190 +++++ aria/contexts.py | 211 ------ aria/decorators.py | 49 +- aria/events/builtin_event_handler.py | 30 +- aria/events/workflow_engine_event_handler.py | 20 +- aria/logger.py | 6 +- aria/storage/models.py | 62 +- aria/tools/lru_cache.py | 6 +- aria/workflows/api/__init__.py | 6 + aria/workflows/api/task.py | 109 +++ aria/workflows/api/task_graph.py | 290 ++++++++ aria/workflows/api/tasks_graph.py | 203 ----- aria/workflows/builtin/execute_operation.py | 34 +- aria/workflows/builtin/heal.py | 163 ++-- aria/workflows/builtin/install.py | 23 +- aria/workflows/builtin/uninstall.py | 17 +- aria/workflows/builtin/workflows.py | 144 ++-- aria/workflows/core/__init__.py | 6 + aria/workflows/core/engine.py | 19 +- aria/workflows/core/task.py | 200 +++++ aria/workflows/core/tasks.py | 121 --- aria/workflows/core/translation.py | 45 +- aria/workflows/exceptions.py | 7 + aria/workflows/executor/blocking.py | 5 +- aria/workflows/executor/celery.py | 5 +- aria/workflows/executor/multiprocess.py | 7 +- aria/workflows/executor/thread.py | 6 +- requirements.txt | 2 + setup.py | 4 - tests/.pylintrc | 21 +- tests/mock/__init__.py | 16 + tests/mock/context.py | 32 + tests/mock/models.py | 132 ++++ tests/mock/operations.py | 33 + tests/test_logger.py | 2 +- tests/workflows/__init__.py | 2 + tests/workflows/api/__init__.py | 15 + tests/workflows/api/test_task.py | 98 +++ tests/workflows/api/test_task_graph.py | 745 +++++++++++++++++++ tests/workflows/builtin/__init__.py | 86 +++ .../workflows/builtin/test_execute_operation.py | 51 ++ tests/workflows/builtin/test_heal.py | 88 +++ tests/workflows/builtin/test_install.py | 39 + tests/workflows/builtin/test_uninstall.py | 39 + tests/workflows/core/__init__.py | 14 + tests/workflows/core/test_executor.py | 136 ++++ .../test_task_graph_into_exececution_graph.py | 97 +++ tests/workflows/test_engine.py | 41 +- tests/workflows/test_executor.py | 136 ---- .../test_task_graph_into_exececution_graph.py | 79 -- tox.ini | 4 +- 57 files changed, 2894 insertions(+), 1112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 8a54504..5413ff2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,6 +21,7 @@ env: - TOX_ENV=py26 install: - pip install --upgrade pip + - pip install --upgrade setuptools - pip install tox script: - pip --version http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/.pylintrc ---------------------------------------------------------------------- diff --git a/aria/.pylintrc b/aria/.pylintrc index eb188a0..e5ee9de 100644 --- a/aria/.pylintrc +++ b/aria/.pylintrc @@ -1,3 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + [MASTER] # Python code to execute, usually for sys.path manipulation such as @@ -62,7 +77,7 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,logging-format-interpolation +disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,logging-format-interpolation,import-error [REPORTS] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 4d9b0f4..eca7b9b 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -56,7 +56,7 @@ def application_model_storage(driver): models.DeploymentModification, models.Execution, models.ProviderContext, - models.Operation, + models.Task, ]) return _model_storage[driver] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/cli/commands.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands.py b/aria/cli/commands.py index ddc27b5..d3698fd 100644 --- a/aria/cli/commands.py +++ b/aria/cli/commands.py @@ -29,7 +29,7 @@ from aria import application_model_storage, application_resource_storage from aria.logger import LoggerMixin from aria.storage import FileSystemModelDriver, FileSystemResourceDriver from aria.tools.application import StorageManager -from aria.contexts import WorkflowContext +from aria.context.workflow import WorkflowContext from aria.workflows.core.engine import Engine from aria.workflows.executor.thread import ThreadExecutor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/context/__init__.py ---------------------------------------------------------------------- diff --git a/aria/context/__init__.py b/aria/context/__init__.py new file mode 100644 index 0000000..20e19db --- /dev/null +++ b/aria/context/__init__.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Provides contexts to workflow and operation +""" + +from . import workflow, operation http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/context/operation.py b/aria/context/operation.py new file mode 100644 index 0000000..d4d229a --- /dev/null +++ b/aria/context/operation.py @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow and operation contexts +""" + +from uuid import uuid4 + +from aria.logger import LoggerMixin + +class OperationContext(LoggerMixin): + """ + Context object used during operation creation and execution + """ + + def __init__( + self, + name, + operation_details, + workflow_context, + node_instance, + inputs=None): + super(OperationContext, self).__init__() + self.name = name + self.id = str(uuid4()) + self.operation_details = operation_details + self.workflow_context = workflow_context + self.node_instance = node_instance + self.inputs = inputs or {} + + def __repr__(self): + details = ', '.join( + '{0}={1}'.format(key, value) + for key, value in self.operation_details.items()) + return '{name}({0})'.format(details, name=self.name) + + def __getattr__(self, attr): + try: + return getattr(self.workflow_context, attr) + except AttributeError: + return super(OperationContext, self).__getattribute__(attr) + + @property + def operation(self): + """ + The model operation + """ + return self.storage.operation.get(self.id) + + @operation.setter + def operation(self, value): + """ + Store the operation in the model storage + """ + self.storage.operation.store(value) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/context/workflow.py b/aria/context/workflow.py new file mode 100644 index 0000000..8183d42 --- /dev/null +++ b/aria/context/workflow.py @@ -0,0 +1,190 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow and operation contexts +""" + +import threading +from uuid import uuid4 +from contextlib import contextmanager + +from .. import logger +from ..tools.lru_cache import lru_cache +from .. import exceptions + + +class ContextException(exceptions.AriaError): + """ + Context based exception + """ + pass + + +class WorkflowContext(logger.LoggerMixin): + """ + Context object used during workflow creation and execution + """ + + def __init__( + self, + name, + model_storage, + resource_storage, + deployment_id, + workflow_id, + execution_id=None, + parameters=None, + **kwargs): + super(WorkflowContext, self).__init__(**kwargs) + self.name = name + self.id = str(uuid4()) + self.model = model_storage + self.resource = resource_storage + self.deployment_id = deployment_id + self.workflow_id = workflow_id + self.execution_id = execution_id or str(uuid4()) + self.parameters = parameters or {} + + def __repr__(self): + return ( + '{name}(deployment_id={self.deployment_id}, ' + 'workflow_id={self.workflow_id}, ' + 'execution_id={self.execution_id})'.format( + name=self.__class__.__name__, self=self)) + + @property + def blueprint_id(self): + """ + The blueprint id + """ + return self.deployment.blueprint_id + + @property + @lru_cache() + def blueprint(self): + """ + The blueprint model + """ + return self.model.blueprint.get(self.blueprint_id) + + @property + @lru_cache() + def deployment(self): + """ + The deployment model + """ + return self.model.deployment.get(self.deployment_id) + + @property + def nodes(self): + """ + Iterator over nodes + """ + return self.model.node.iter( + filters={'blueprint_id': self.blueprint_id}) + + @property + def node_instances(self): + """ + Iterator over node instances + """ + return self.model.node_instance.iter(filters={'deployment_id': self.deployment_id}) + + @property + def execution(self): + """ + The execution model + """ + return self.model.execution.get(self.execution_id) + + @execution.setter + def execution(self, value): + """ + Store the execution in the model storage + """ + self.model.execution.store(value) + + def download_blueprint_resource(self, destination, path=None): + """ + Download a blueprint resource from the resource storage + """ + return self.resource.blueprint.download( + entry_id=self.blueprint_id, + destination=destination, + path=path) + + def download_deployment_resource(self, destination, path=None): + """ + Download a deployment resource from the resource storage + """ + return self.resource.deployment.download( + entry_id=self.deployment_id, + destination=destination, + path=path) + + @lru_cache() + def get_deployment_resource_data(self, path=None): + """ + Read a deployment resource as string from the resource storage + """ + return self.resource.deployment.data(entry_id=self.deployment_id, path=path) + + @lru_cache() + def get_blueprint_resource_data(self, path=None): + """ + Read a blueprint resource as string from the resource storage + """ + return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path) + + +class _CurrentContext(threading.local): + """ + Provides thread-level context, which sugarcoats the task api. + """ + + def __init__(self): + super(_CurrentContext, self).__init__() + self._workflow_context = None + + def _set(self, value): + self._workflow_context = value + + def get(self): + """ + Retrieves the current workflow context + :return: the workflow context + :rtype: WorkflowContext + """ + if self._workflow_context is not None: + return self._workflow_context + raise ContextException("No context was set") + + @contextmanager + def push(self, workflow_context): + """ + Switches the current context to the provided context + :param workflow_context: the context to switch to. + :yields: the current context + """ + prev_workflow_context = self._workflow_context + self._set(workflow_context) + try: + yield self + finally: + self._set(prev_workflow_context) + +current = _CurrentContext() + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/contexts.py ---------------------------------------------------------------------- diff --git a/aria/contexts.py b/aria/contexts.py deleted file mode 100644 index fdd26a2..0000000 --- a/aria/contexts.py +++ /dev/null @@ -1,211 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow and operation contexts -""" - -from uuid import uuid4 - -from aria.logger import LoggerMixin -from aria.tools.lru_cache import lru_cache -from aria.workflows.api.tasks_graph import TaskGraph - - -class WorkflowContext(LoggerMixin): - """ - Context object used during workflow creation and execution - """ - - def __init__( - self, - name, - model_storage, - resource_storage, - deployment_id, - workflow_id, - parameters=None, - **kwargs): - super(WorkflowContext, self).__init__(**kwargs) - self.name = name - self.id = str(uuid4()) - self.model = model_storage - self.resource = resource_storage - self.deployment_id = deployment_id - self.workflow_id = workflow_id - self.execution_id = str(uuid4()) - self.parameters = parameters or {} - - def __repr__(self): - return ( - '{name}(deployment_id={self.deployment_id}, ' - 'workflow_id={self.workflow_id}, ' - 'execution_id={self.execution_id})'.format( - name=self.__class__.__name__, self=self)) - - def operation( - self, - name, - operation_details, - node_instance, - inputs=None): - """ - Called during workflow creation, return an operation context. This object should be added to - the task graph. - """ - return OperationContext( - name=name, - operation_details=operation_details, - workflow_context=self, - node_instance=node_instance, - inputs=inputs or {}) - - @property - def task_graph(self): - """ - The task graph class - """ - return TaskGraph - - @property - def blueprint_id(self): - """ - The blueprint id - """ - return self.deployment.blueprint_id - - @property - @lru_cache() - def blueprint(self): - """ - The blueprint model - """ - return self.model.blueprint.get(self.blueprint_id) - - @property - @lru_cache() - def deployment(self): - """ - The deployment model - """ - return self.model.deployment.get(self.deployment_id) - - @property - def nodes(self): - """ - Iterator over nodes - """ - return self.model.node.iter( - filters={'blueprint_id': self.blueprint_id}) - - @property - def node_instances(self): - """ - Iterator over node instances - """ - return self.model.node_instance.iter(filters={'deployment_id': self.deployment_id}) - - @property - def execution(self): - """ - The execution model - """ - return self.model.execution.get(self.execution_id) - - @execution.setter - def execution(self, value): - """ - Store the execution in the model storage - """ - self.model.execution.store(value) - - def download_blueprint_resource(self, destination, path=None): - """ - Download a blueprint resource from the resource storage - """ - return self.resource.blueprint.download( - entry_id=self.blueprint_id, - destination=destination, - path=path) - - def download_deployment_resource(self, destination, path=None): - """ - Download a deployment resource from the resource storage - """ - return self.resource.deployment.download( - entry_id=self.deployment_id, - destination=destination, - path=path) - - @lru_cache() - def get_deployment_resource_data(self, path=None): - """ - Read a deployment resource as string from the resource storage - """ - return self.resource.deployment.data(entry_id=self.deployment_id, path=path) - - @lru_cache() - def get_blueprint_resource_data(self, path=None): - """ - Read a blueprint resource as string from the resource storage - """ - return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path) - - -class OperationContext(LoggerMixin): - """ - Context object used during operation creation and execution - """ - - def __init__( - self, - name, - operation_details, - workflow_context, - node_instance, - inputs=None): - super(OperationContext, self).__init__() - self.name = name - self.id = str(uuid4()) - self.operation_details = operation_details - self.workflow_context = workflow_context - self.node_instance = node_instance - self.inputs = inputs or {} - - def __repr__(self): - details = ', '.join( - '{0}={1}'.format(key, value) - for key, value in self.operation_details.items()) - return '{name}({0})'.format(details, name=self.name) - - def __getattr__(self, attr): - try: - return getattr(self.workflow_context, attr) - except AttributeError: - return super(OperationContext, self).__getattribute__(attr) - - @property - def operation(self): - """ - The model operation - """ - return self.model.operation.get(self.id) - - @operation.setter - def operation(self, value): - """ - Store the operation in the model storage - """ - self.model.operation.store(value) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/decorators.py ---------------------------------------------------------------------- diff --git a/aria/decorators.py b/aria/decorators.py index 7bc41b3..a07e2ee 100644 --- a/aria/decorators.py +++ b/aria/decorators.py @@ -20,12 +20,13 @@ Workflow and operation decorators from uuid import uuid4 from functools import partial, wraps -from aria.tools.validation import validate_function_arguments +from . import context +from .workflows.api import task_graph +from .tools.validation import validate_function_arguments def workflow( func=None, - workflow_context=True, simple_workflow=True, suffix_template=''): """ @@ -34,31 +35,29 @@ def workflow( if func is None: return partial( workflow, - workflow_context=workflow_context, simple_workflow=simple_workflow, suffix_template=suffix_template) @wraps(func) - def _wrapper(context, **custom_kwargs): + def _wrapper(ctx, **workflow_parameters): + workflow_name = _generate_workflow_name( func_name=func.__name__, suffix_template=suffix_template, - context=context, - **custom_kwargs) - func_kwargs = _create_func_kwargs( - custom_kwargs, - context, - add_context=workflow_context, - workflow_name=workflow_name) - validate_function_arguments(func, func_kwargs) - func(**func_kwargs) - return func_kwargs['graph'] + ctx=ctx, + **workflow_parameters) + + workflow_parameters.setdefault('ctx', ctx) + workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name)) + validate_function_arguments(func, workflow_parameters) + with context.workflow.current.push(ctx): + func(**workflow_parameters) + return workflow_parameters['graph'] return _wrapper def operation( - func=None, - operation_context=True): + func=None): """ Operation decorator """ @@ -66,29 +65,25 @@ def operation( return partial(operation) @wraps(func) - def _wrapper(context, **custom_kwargs): + def _wrapper(ctx, **custom_kwargs): func_kwargs = _create_func_kwargs( custom_kwargs, - context, - add_context=operation_context) + ctx) validate_function_arguments(func, func_kwargs) - context.description = func.__doc__ + ctx.description = func.__doc__ return func(**func_kwargs) return _wrapper -def _generate_workflow_name(func_name, context, suffix_template, **custom_kwargs): +def _generate_workflow_name(func_name, ctx, suffix_template, **custom_kwargs): return '{func_name}.{suffix}'.format( func_name=func_name, - suffix=suffix_template.format(context=context, **custom_kwargs) or str(uuid4())) + suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or str(uuid4())) def _create_func_kwargs( kwargs, - context, - add_context=True, + ctx, workflow_name=None): - if add_context: - kwargs['context'] = context - kwargs.setdefault('graph', context.task_graph(workflow_name)) + kwargs.setdefault('graph', ctx.task_graph(workflow_name)) return kwargs http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py index 2dfbd00..2abdd9f 100644 --- a/aria/events/builtin_event_handler.py +++ b/aria/events/builtin_event_handler.py @@ -36,37 +36,29 @@ from . import ( @sent_task_signal.connect def _task_sent(task, *args, **kwargs): - operation_context = task.context - operation = operation_context.operation - operation.status = operation.SENT - operation_context.operation = operation + with task.update(): + task.status = task.SENT @start_task_signal.connect def _task_started(task, *args, **kwargs): - operation_context = task.context - operation = operation_context.operation - operation.started_at = datetime.utcnow() - operation.status = operation.STARTED - operation_context.operation = operation + with task.update(): + task.started_at = datetime.utcnow() + task.status = task.STARTED @on_failure_task_signal.connect def _task_failed(task, *args, **kwargs): - operation_context = task.context - operation = operation_context.operation - operation.ended_at = datetime.utcnow() - operation.status = operation.FAILED - operation_context.operation = operation + with task.update(): + task.ended_at = datetime.utcnow() + task.status = task.FAILED @on_success_task_signal.connect def _task_succeeded(task, *args, **kwargs): - operation_context = task.context - operation = operation_context.operation - operation.ended_at = datetime.utcnow() - operation.status = operation.SUCCESS - operation_context.operation = operation + with task.update(): + task.ended_at = datetime.utcnow() + task.status = task.SUCCESS @start_workflow_signal.connect http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/events/workflow_engine_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/workflow_engine_event_handler.py b/aria/events/workflow_engine_event_handler.py index 60138e1..2f74ded 100644 --- a/aria/events/workflow_engine_event_handler.py +++ b/aria/events/workflow_engine_event_handler.py @@ -33,36 +33,30 @@ from . import ( @start_task_signal.connect def _start_task_handler(task, **kwargs): - task.logger.debug( - 'Event: Starting task: {task.name}'.format(task=task)) + task.logger.debug('Event: Starting task: {task.name}'.format(task=task)) @on_success_task_signal.connect def _success_task_handler(task, **kwargs): - task.logger.debug( - 'Event: Task success: {task.name}'.format(task=task)) + task.logger.debug('Event: Task success: {task.name}'.format(task=task)) @on_failure_task_signal.connect def _failure_operation_handler(task, **kwargs): - task.logger.error( - 'Event: Task failure: {task.name}'.format(task=task), - exc_info=kwargs.get('exception', True)) + task.logger.error('Event: Task failure: {task.name}'.format(task=task), + exc_info=kwargs.get('exception', True)) @start_workflow_signal.connect def _start_workflow_handler(context, **kwargs): - context.logger.debug( - 'Event: Starting workflow: {context.name}'.format(context=context)) + context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context)) @on_failure_workflow_signal.connect def _failure_workflow_handler(context, **kwargs): - context.logger.debug( - 'Event: Workflow failure: {context.name}'.format(context=context)) + context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context)) @on_success_workflow_signal.connect def _success_workflow_handler(context, **kwargs): - context.logger.debug( - 'Event: Workflow success: {context.name}'.format(context=context)) + context.logger.debug('Event: Workflow success: {context.name}'.format(context=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 4959126..0002cb5 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -37,7 +37,7 @@ class LoggerMixin(object): def __init__(self, *args, **kwargs): self.logger_name = self.logger_name or self.__class__.__name__ - self.logger = _base_logger.getChild(self.logger_name) + self.logger = logging.getLogger('{0}.{1}'.format(_base_logger.name, self.logger_name)) self.logger.setLevel(self.logger_level) super(LoggerMixin, self).__init__(*args, **kwargs) @@ -63,7 +63,7 @@ class LoggerMixin(object): def __setstate__(self, obj_dict): vars(self).update( - logger=_base_logger.getChild(obj_dict['logger_name']), + logger=logging.getLogger('{0}.{1}'.format(_base_logger.name, obj_dict['logger_name'])), **obj_dict) @@ -112,7 +112,7 @@ class _DefaultConsoleFormat(logging.Formatter): self._fmt = '%(levelname)s: %(message)s' except AttributeError: return record.message - return super(_DefaultConsoleFormat, self).format(record) + return logging.Formatter.format(self, record) def create_file_log_handler( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index e5fc1ac..9aa7cf0 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -216,34 +216,6 @@ class Execution(Model): is_system_workflow = Field(type=bool, default=False) -class Operation(Model): - """ - A Model which represents an operation - """ - PENDING = 'pending' - SENT = 'sent' - STARTED = 'started' - SUCCESS = 'success' - FAILED = 'failed' - STATES = ( - PENDING, - SENT, - STARTED, - SUCCESS, - FAILED, - ) - END_STATES = [SUCCESS, FAILED] - - id = Field(type=basestring, default=uuid_generator) - status = Field(type=basestring, choices=STATES, default=PENDING) - execution_id = Field(type=basestring) - eta = Field(type=datetime, default=datetime.now) - started_at = Field(type=datetime, default=None) - ended_at = Field(type=datetime, default=None) - max_retries = Field(type=int, default=0) - retry_count = Field(type=int, default=0) - - class Relationship(Model): """ A Model which represents a relationship @@ -397,3 +369,37 @@ class Plugin(Model): excluded_wheels = Field() supported_py_versions = Field(type=list) uploaded_at = Field(type=datetime) + + +class Task(Model): + """ + A Model which represents an task + """ + PENDING = 'pending' + SENT = 'sent' + STARTED = 'started' + SUCCESS = 'success' + FAILED = 'failed' + STATES = ( + PENDING, + SENT, + STARTED, + SUCCESS, + FAILED, + ) + END_STATES = [SUCCESS, FAILED] + + id = Field(type=basestring, default=uuid_generator) + status = Field(type=basestring, choices=STATES, default=PENDING) + execution_id = Field(type=basestring) + eta = Field(type=datetime, default=datetime.now) + started_at = Field(type=datetime, default=None) + ended_at = Field(type=datetime, default=None) + max_retries = Field(type=int, default=1) + retry_count = Field(type=int, default=0) + + # Operation specific fields + name = Field(type=basestring) + operation_details = Field(type=dict) + node_instance = PointerField(type=NodeInstance) + inputs = Field(type=dict, default=lambda: {}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/tools/lru_cache.py ---------------------------------------------------------------------- diff --git a/aria/tools/lru_cache.py b/aria/tools/lru_cache.py index 5863376..bb39b90 100755 --- a/aria/tools/lru_cache.py +++ b/aria/tools/lru_cache.py @@ -21,7 +21,11 @@ Function lru_cache implementation for python 2.7 from time import time from functools import partial, wraps from itertools import imap -from collections import OrderedDict + +try: + from collections import OrderedDict +except ImportError: + from ordereddict import OrderedDict class _LRUCache(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/api/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/__init__.py b/aria/workflows/api/__init__.py index ae1e83e..a3a17ee 100644 --- a/aria/workflows/api/__init__.py +++ b/aria/workflows/api/__init__.py @@ -12,3 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +""" +Provides API for building tasks +""" + +from . import task, task_graph http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py new file mode 100644 index 0000000..1070d99 --- /dev/null +++ b/aria/workflows/api/task.py @@ -0,0 +1,109 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Provides the tasks to be enterd into the task graph +""" +from uuid import uuid4 + +from ... import context + + +class BaseTask(object): + """ + Abstract task_graph task + """ + def __init__(self, ctx=None, **kwargs): + if ctx is not None: + self._workflow_context = ctx + else: + self._workflow_context = context.workflow.current.get() + self._id = str(uuid4()) + + @property + def id(self): + """ + uuid4 generated id + :return: + """ + return self._id + + @property + def workflow_context(self): + """ + the context of the current workflow + :return: + """ + return self._workflow_context + + +class OperationTask(BaseTask): + """ + Represents an operation task in the task_graph + """ + + def __init__(self, + name, + operation_details, + node_instance, + inputs=None): + """ + Creates an operation task using the name, details, node instance and any additional kwargs. + :param name: the operation of the name. + :param operation_details: the details for the operation. + :param node_instance: the node instance on which this operation is registered. + :param inputs: operation inputs. + """ + super(OperationTask, self).__init__() + self.name = name + self.operation_details = operation_details + self.node_instance = node_instance + self.inputs = inputs or {} + + +class WorkflowTask(BaseTask): + """ + Represents an workflow task in the task_graph + """ + def __init__(self, workflow_func, **kwargs): + """ + Creates a workflow based task using the workflow_func provided, and its kwargs + :param workflow_func: the function to run + :param kwargs: the kwargs that would be passed to the workflow_func + """ + super(WorkflowTask, self).__init__(**kwargs) + kwargs['ctx'] = self.workflow_context + self._graph = workflow_func(**kwargs) + + @property + def graph(self): + """ + The graph constructed by the sub workflow + :return: + """ + return self._graph + + def __getattr__(self, item): + try: + return getattr(self._graph, item) + except AttributeError: + return super(WorkflowTask, self).__getattribute__(item) + + +class StubTask(BaseTask): + """ + Enables creating empty tasks. + """ + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/api/task_graph.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/task_graph.py b/aria/workflows/api/task_graph.py new file mode 100644 index 0000000..c88d343 --- /dev/null +++ b/aria/workflows/api/task_graph.py @@ -0,0 +1,290 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Task graph. Used by users to build workflows +""" + +from uuid import uuid4 +from collections import Iterable + +from networkx import DiGraph, topological_sort + +from . import task as api_task + + +class TaskNotInGraphError(Exception): + """ + An error representing a scenario where a given task is not in the graph as expected + """ + pass + + +def _filter_out_empty_tasks(func=None): + if func is None: + return lambda f: _filter_out_empty_tasks(func=f) + + def _wrapper(task, *tasks, **kwargs): + return func(*(t for t in [task] + list(tasks) if t), **kwargs) + return _wrapper + + +class TaskGraph(object): + """ + A tasks graph builder. + Build an operations flow graph + """ + + def __init__(self, name): + self.name = name + self._id = str(uuid4()) + self._graph = DiGraph() + + def __repr__(self): + return '{name}(id={self._id}, name={self.name}, graph={self._graph!r})'.format( + name=self.__class__.__name__, self=self) + + @property + def id(self): + """ + Represents the id of the graph + :return: graph id + """ + return self._id + + # graph traversal methods + + @property + def tasks(self): + """ + An iterator on tasks added to the graph + :yields: Iterator over all tasks in the graph + """ + for _, data in self._graph.nodes_iter(data=True): + yield data['task'] + + def topological_order(self, reverse=False): + """ + Returns topological sort on the graph + :param reverse: whether to reverse the sort + :return: a list which represents the topological sort + """ + for task_id in topological_sort(self._graph, reverse=reverse): + yield self.get_task(task_id) + + def get_dependencies(self, dependent_task): + """ + Iterates over the task's dependencies + :param BaseTask dependent_task: The task whose dependencies are requested + :yields: Iterator over all tasks which dependency_task depends on + :raise: TaskNotInGraphError if dependent_task is not in the graph + """ + if not self.has_tasks(dependent_task): + raise TaskNotInGraphError('Task id: {0}'.format(dependent_task.id)) + for _, dependency_id in self._graph.out_edges_iter(dependent_task.id): + yield self.get_task(dependency_id) + + def get_dependents(self, dependency_task): + """ + Iterates over the task's dependents + :param BaseTask dependency_task: The task whose dependents are requested + :yields: Iterator over all tasks which depend on dependency_task + :raise: TaskNotInGraphError if dependency_task is not in the graph + """ + if not self.has_tasks(dependency_task): + raise TaskNotInGraphError('Task id: {0}'.format(dependency_task.id)) + for dependent_id, _ in self._graph.in_edges_iter(dependency_task.id): + yield self.get_task(dependent_id) + + # task methods + + def get_task(self, task_id): + """ + Get a task instance that's been inserted to the graph by the task's id + :param basestring task_id: The task's id + :return: Requested task + :rtype: BaseTask + :raise: TaskNotInGraphError if no task found in the graph with the given id + """ + if not self._graph.has_node(task_id): + raise TaskNotInGraphError('Task id: {0}'.format(task_id)) + data = self._graph.node[task_id] + return data['task'] + + @_filter_out_empty_tasks + def add_tasks(self, *tasks): + """ + Add a task to the graph + :param BaseTask task: The task + :return: A list of added tasks + :rtype: list + """ + assert all([isinstance(task, (api_task.BaseTask, Iterable)) for task in tasks]) + return_tasks = [] + + for task in tasks: + if isinstance(task, Iterable): + return_tasks += self.add_tasks(*task) + elif not self.has_tasks(task): + self._graph.add_node(task.id, task=task) + return_tasks.append(task) + + return return_tasks + + @_filter_out_empty_tasks + def remove_tasks(self, *tasks): + """ + Remove the provided task from the graph + :param BaseTask task: The task + :return: A list of removed tasks + :rtype: list + """ + return_tasks = [] + + for task in tasks: + if isinstance(task, Iterable): + return_tasks += self.remove_tasks(*task) + elif self.has_tasks(task): + self._graph.remove_node(task.id) + return_tasks.append(task) + + return return_tasks + + @_filter_out_empty_tasks + def has_tasks(self, *tasks): + """ + Check whether a task is in the graph or not + :param BaseTask task: The task + :return: True if all tasks are in the graph, otherwise True + :rtype: list + """ + assert all(isinstance(t, (api_task.BaseTask, Iterable)) for t in tasks) + return_value = True + + for task in tasks: + if isinstance(task, Iterable): + return_value &= self.has_tasks(*task) + else: + return_value &= self._graph.has_node(task.id) + + return return_value + + def add_dependency(self, dependent, dependency): + """ + Add a dependency for one item (task, sequence or parallel) on another + The dependent will only be executed after the dependency terminates + If either of the items is either a sequence or a parallel, + multiple dependencies may be added + :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) + :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) + :return: True if the dependency between the two hadn't already existed, otherwise False + :rtype: bool + :raise TaskNotInGraphError if either the dependent or dependency are tasks which + are not in the graph + """ + if not (self.has_tasks(dependent) and self.has_tasks(dependency)): + raise TaskNotInGraphError() + + if self.has_dependency(dependent, dependency): + return + + if isinstance(dependent, Iterable): + for dependent_task in dependent: + self.add_dependency(dependent_task, dependency) + else: + if isinstance(dependency, Iterable): + for dependency_task in dependency: + self.add_dependency(dependent, dependency_task) + else: + self._graph.add_edge(dependent.id, dependency.id) + + def has_dependency(self, dependent, dependency): + """ + Check whether one item (task, sequence or parallel) depends on another + + Note that if either of the items is either a sequence or a parallel, + and some of the dependencies exist in the graph but not all of them, + this method will return False + + :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) + :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) + :return: True if the dependency between the two exists, otherwise False + :rtype: bool + :raise TaskNotInGraphError if either the dependent or dependency are tasks + which are not in the graph + """ + if not (dependent and dependency): + return False + elif not (self.has_tasks(dependent) and self.has_tasks(dependency)): + raise TaskNotInGraphError() + + return_value = True + + if isinstance(dependent, Iterable): + for dependent_task in dependent: + return_value &= self.has_dependency(dependent_task, dependency) + else: + if isinstance(dependency, Iterable): + for dependency_task in dependency: + return_value &= self.has_dependency(dependent, dependency_task) + else: + return_value &= self._graph.has_edge(dependent.id, dependency.id) + + return return_value + + def remove_dependency(self, dependent, dependency): + """ + Remove a dependency for one item (task, sequence or parallel) on another + + Note that if either of the items is either a sequence or a parallel, and some of + the dependencies exist in the graph but not all of them, this method will not remove + any of the dependencies and return False + + :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) + :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) + :return: False if the dependency between the two hadn't existed, otherwise True + :rtype: bool + :raise TaskNotInGraphError if either the dependent or dependency are tasks + which are not in the graph + """ + if not (self.has_tasks(dependent) and self.has_tasks(dependency)): + raise TaskNotInGraphError() + + if not self.has_dependency(dependent, dependency): + return + + if isinstance(dependent, Iterable): + for dependent_task in dependent: + self.remove_dependency(dependent_task, dependency) + elif isinstance(dependency, Iterable): + for dependency_task in dependency: + self.remove_dependency(dependent, dependency_task) + else: + self._graph.remove_edge(dependent.id, dependency.id) + + @_filter_out_empty_tasks + def sequence(self, *tasks): + """ + Create and insert a sequence into the graph, effectively each task i depends on i-1 + :param tasks: an iterable of dependencies + :return: the provided tasks + """ + if tasks: + self.add_tasks(*tasks) + + for i in xrange(1, len(tasks)): + self.add_dependency(tasks[i], tasks[i-1]) + + return tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/api/tasks_graph.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/tasks_graph.py b/aria/workflows/api/tasks_graph.py deleted file mode 100644 index 5160345..0000000 --- a/aria/workflows/api/tasks_graph.py +++ /dev/null @@ -1,203 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from uuid import uuid4 - -from networkx import DiGraph, topological_sort - -from aria.tools.validation import ValidatorMixin - - -class TaskNotFoundError(Exception): - pass - - -class TaskNotInGraphError(Exception): - pass - - -class TaskGraph(ValidatorMixin): - """ - A task graph builder. - Build a operations flow graph - """ - - def __init__(self, name): - self.name = name - self.id = str(uuid4()) - self.graph = DiGraph() - - def __getattr__(self, attr): - try: - return getattr(self.graph, attr) - except AttributeError: - return super(TaskGraph, self).__getattribute__(attr) - - def __repr__(self): - return '{name}(id={self.id}, name={self.name}, graph={self.graph!r})'.format( - name=self.__class__.__name__, self=self) - - @property - def tasks(self): - """ - An iterator on tasks added to the graph - """ - for _, data in self.graph.nodes_iter(data=True): - yield data['task'] - - @property - def leaf_tasks(self): - for task in self.tasks_in_order(): - if not self.graph.predecessors(task.id): - yield task - - def task_tree(self, reverse=False): - """ - Iterates over the tasks to be executed in topological order and their dependencies. - :param reverse: reverse the order - """ - for task in self.tasks_in_order(reverse=reverse): - yield task, self.task_dependencies(task) - - def tasks_in_order(self, reverse=False): - """ - Iterates over the tasks to be executed in topological order - :param reverse: reverse the order - """ - for task_id in topological_sort(self.graph, reverse=reverse): - yield self.graph.node[task_id]['task'] - - def has_dependencies(self, task): - return len(self.task_dependencies(task)) > 0 - - def task_dependencies(self, task): - """ - Iterates over the task dependencies - """ - for task_ids in self.graph.edges_iter(task.id): - for task_id in task_ids: - if task.id != task_id: - yield self.get_task(task_id) - - def add_task(self, task): - """ - Add a task to this graph - :param WorkflowTask|TaskGraph task: The task - """ - self.graph.add_node(task.id, task=task) - - def get_task(self, task_id): - """ - Get a task instance that was inserted to this graph by its id - - :param basestring task_id: the task id - :return: requested task - :rtype: WorkflowTask|TaskGraph - :raise: TaskNotFoundError if no task found with given id - """ - try: - data = self.graph.node[task_id] - return data['task'] - except KeyError: - raise TaskNotFoundError('Task id: {0}'.format(task_id)) - - def remove_task(self, task): - """ - Remove the provided task from the graph - :param WorkflowTask|graph task: The task - """ - self.graph.remove_node(task.id) - - def dependency(self, source_task, after): - """ - Add a dependency between tasks. - The source task will only be executed after the target task terminates. - A source task may depend on several tasks, - In which case it will only be executed after all its target tasks will terminate. - - tasks flow order: - after -> source_task - - :param WorkflowTask|TaskGraph source_task: The source task - :type source_task: WorkflowTask - :param list after: The target task - :raise TaskNotInGraphError - """ - if not self.graph.has_node(source_task.id): - raise TaskNotInGraphError( - 'source task {0!r} is not in graph (task id: {0.id})'.format(source_task)) - for target_task in after: - if not self.graph.has_node(target_task.id): - raise TaskNotInGraphError( - 'target task {0!r} is not in graph (task id: {0.id})'.format(target_task)) - self.graph.add_edge(source_task.id, target_task.id) - - # workflow creation helper methods - def chain(self, tasks, after=()): - """ - create a chain of tasks. - tasks will be added to the graph with a dependency between - the tasks by order. - - tasks flow order: - if tasks = (task0, task1, ..., taskn) - after -> task0 -> task1 -> ... -> taskn - - :param tasks: list of WorkflowTask instances. - :param after: target to the sequence - """ - for source_task in tasks: - self.add_task(source_task) - self.dependency(source_task, after=after) - after = (source_task,) - - def fan_out(self, tasks, after=()): - """ - create a fan-out. - tasks will be added to the graph with a dependency to - the target task. - - tasks flow order: - if tasks = (task0, task1, ..., taskn) - after -> task0 - |-> task1 - |... - \-> taskn - - :param tasks: list of WorkflowTask instances. - :param after: target to the tasks - """ - for source_task in tasks: - self.add_task(source_task) - self.dependency(source_task, after=after) - - def fan_in(self, source_task, after=None): - """ - create a fan-in. - source task will be added to the graph with a dependency to - the tasks. - - tasks flow order: - if after = (task0, task1, ..., taskn) - task0\ - task1|-> source_task - ... | - taskn/ - - :param source_task: source to the tasks - :param after: list of WorkflowTask instances. - """ - self.add_task(source_task) - self.dependency(source_task, after=after) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/builtin/execute_operation.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/execute_operation.py b/aria/workflows/builtin/execute_operation.py index 9e87c30..ddbb8e7 100644 --- a/aria/workflows/builtin/execute_operation.py +++ b/aria/workflows/builtin/execute_operation.py @@ -24,7 +24,7 @@ from .workflows import execute_operation_on_instance @workflow def execute_operation( - context, + ctx, graph, operation, operation_kwargs, @@ -37,7 +37,7 @@ def execute_operation( """ The execute_operation workflow - :param WorkflowContext context: the workflow context + :param WorkflowContext workflow_context: the workflow context :param TaskGraph graph: the graph which will describe the workflow. :param basestring operation: the operation name to execute :param dict operation_kwargs: @@ -52,7 +52,7 @@ def execute_operation( subgraphs = {} # filtering node instances filtered_node_instances = list(_filter_node_instances( - context=context, + context=ctx, node_ids=node_ids, node_instance_ids=node_instance_ids, type_names=type_names)) @@ -60,31 +60,31 @@ def execute_operation( if run_by_dependency_order: filtered_node_instances_ids = set(node_instance.id for node_instance in filtered_node_instances) - for node_instance in context.node_instances: + for node_instance in ctx.node_instances: if node_instance.id not in filtered_node_instances_ids: - subgraphs[node_instance.id] = context.task_graph( + subgraphs[node_instance.id] = ctx.task_graph( name='execute_operation_stub_{0}'.format(node_instance.id)) # registering actual tasks to sequences for node_instance in filtered_node_instances: - node_instance_sub_workflow = execute_operation_on_instance( - context=context, - graph=graph, - node_instance=node_instance, - operation=operation, - operation_kwargs=operation_kwargs, - allow_kwargs_override=allow_kwargs_override) - subgraphs[node_instance.id] = node_instance_sub_workflow + graph.add_tasks( + execute_operation_on_instance( + node_instance=node_instance, + operation=operation, + operation_kwargs=operation_kwargs, + allow_kwargs_override=allow_kwargs_override + ) + ) for _, node_instance_sub_workflow in subgraphs.items(): - graph.add_task(node_instance_sub_workflow) + graph.add_tasks(node_instance_sub_workflow) # adding tasks dependencies if required if run_by_dependency_order: - for node_instance in context.node_instances: + for node_instance in ctx.node_instances: for relationship_instance in node_instance.relationship_instances: - graph.dependency(source_task=subgraphs[node_instance.id], - after=[subgraphs[relationship_instance.target_id]]) + graph.add_dependency(source_task=subgraphs[node_instance.id], + after=[subgraphs[relationship_instance.target_id]]) def _filter_node_instances(context, node_ids=(), node_instance_ids=(), type_names=()): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/heal.py b/aria/workflows/builtin/heal.py index cab2e6e..dc320dc 100644 --- a/aria/workflows/builtin/heal.py +++ b/aria/workflows/builtin/heal.py @@ -19,50 +19,47 @@ Builtin heal workflow from aria import workflow -from .uninstall import uninstall -from .install import install -from .workflows import relationship_tasks +from .workflows import relationship_tasks, install_node_instance, uninstall_node_instance +from ..api import task @workflow -def heal(context, graph, node_instance_id): +def heal(ctx, graph, node_instance_id): """ The heal workflow - :param WorkflowContext context: the workflow context + :param WorkflowContext ctx: the workflow context :param TaskGraph graph: the graph which will describe the workflow. :param node_instance_id: the id of the node instance to heal :return: """ - failing_node = context.storage.node_instance.get(node_instance_id) - host_node = context.storage.node_instance.get(failing_node.host_id) - failed_node_instance_subgraph = _get_contained_subgraph(context, host_node) + failing_node = ctx.model.node_instance.get(node_instance_id) + host_node = ctx.model.node_instance.get(failing_node.host_id) + failed_node_instance_subgraph = _get_contained_subgraph(ctx, host_node) failed_node_instance_ids = list(n.id for n in failed_node_instance_subgraph) - targeted_node_instances = [ - context.storage.node_instance.get(relationship_instance.target_id) - for node_instance in failed_node_instance_subgraph - for relationship_instance in node_instance.relationship_instances - if relationship_instance.target_id not in failed_node_instance_ids - ] - - graph.chain([ - heal_uninstall( - context=context, - failing_node_instances=failed_node_instance_subgraph, - targeted_node_instances=targeted_node_instances), - heal_install( - context=context, - failing_node_instances=failed_node_instance_subgraph, - targeted_node_instances=targeted_node_instances) - ]) + targeted_node_instances = [node_instance for node_instance in ctx.node_instances + if node_instance.id not in failed_node_instance_ids] + + uninstall_subgraph = task.WorkflowTask( + heal_uninstall, + failing_node_instances=failed_node_instance_subgraph, + targeted_node_instances=targeted_node_instances + ) + + install_subgraph = task.WorkflowTask( + heal_install, + failing_node_instances=failed_node_instance_subgraph, + targeted_node_instances=targeted_node_instances) + + graph.sequence(uninstall_subgraph, install_subgraph) @workflow(suffix_template='{failing_node_instances}') -def heal_uninstall(context, graph, failing_node_instances, targeted_node_instances): +def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances): """ the uninstall part of the heal mechanism - :param WorkflowContext context: the workflow context + :param WorkflowContext ctx: the workflow context :param TaskGraph graph: the task graph to edit. :param failing_node_instances: the failing nodes to heal. :param targeted_node_instances: the targets of the relationships where the failing node are @@ -73,46 +70,49 @@ def heal_uninstall(context, graph, failing_node_instances, targeted_node_instanc # Create install stub workflow for each unaffected node instance for node_instance in targeted_node_instances: - node_instance_sub_workflow = context.task_graph( - name='uninstall_stub_{0}'.format(node_instance.id)) + node_instance_stub = task.StubTask() + node_instance_sub_workflows[node_instance.id] = node_instance_stub + graph.add_tasks(node_instance_stub) + + # create install sub workflow for every node instance + for node_instance in failing_node_instances: + node_instance_sub_workflow = task.WorkflowTask(uninstall_node_instance, + node_instance=node_instance) node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow - graph.add_task(node_instance_sub_workflow) + graph.add_tasks(node_instance_sub_workflow) - # Create install sub workflow for each failing node - uninstall( - context=context, - graph=graph, - node_instances=failing_node_instances, - node_instance_sub_workflows=node_instance_sub_workflows) + # create dependencies between the node instance sub workflow + for node_instance in failing_node_instances: + node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] + for relationship_instance in reversed(node_instance.relationship_instances): + graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id], + node_instance_sub_workflow) - # Add operations for intact nodes depending on a node instance - # belonging to node_instances + # Add operations for intact nodes depending on a node instance belonging to node_instances for node_instance in targeted_node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] for relationship_instance in reversed(node_instance.relationship_instances): - target_node_instance = context.storage.node_instance.get( - relationship_instance.target_id) - if target_node_instance in failing_node_instances: - after_tasks = [node_instance_sub_workflows[relationship.target_id] - for relationship in node_instance.relationship_instances] + target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id) + target_node_instance_subgraph = node_instance_sub_workflows[target_node_instance.id] + graph.add_dependency(target_node_instance_subgraph, node_instance_sub_workflow) - elif target_node_instance in targeted_node_instances: - after_tasks = [relationship_tasks( + if target_node_instance in failing_node_instances: + dependency = relationship_tasks( + graph=graph, node_instance=node_instance, relationship_instance=relationship_instance, - context=context, - operation_name='aria.interfaces.relationship_lifecycle.unlink')] + context=ctx, + operation_name='aria.interfaces.relationship_lifecycle.unlink') - if after_tasks: - graph.dependency(source_task=node_instance_sub_workflow, after=after_tasks) + graph.add_dependency(node_instance_sub_workflow, dependency) @workflow(suffix_template='{failing_node_instances}') -def heal_install(context, graph, failing_node_instances, targeted_node_instances): +def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): """ the install part of the heal mechanism - :param WorkflowContext context: the workflow context + :param WorkflowContext ctx: the workflow context :param TaskGraph graph: the task graph to edit. :param failing_node_instances: the failing nodes to heal. :param targeted_node_instances: the targets of the relationships where the failing node are @@ -123,17 +123,24 @@ def heal_install(context, graph, failing_node_instances, targeted_node_instances # Create install sub workflow for each unaffected for node_instance in targeted_node_instances: - node_instance_sub_workflow = context.task_graph( - name='install_stub_{0}'.format(node_instance.id)) - node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow - graph.add_task(node_instance_sub_workflow) + node_instance_stub = task.StubTask() + node_instance_sub_workflows[node_instance.id] = node_instance_stub + graph.add_tasks(node_instance_stub) # create install sub workflow for every node instance - install( - context=context, - graph=graph, - node_instances=failing_node_instances, - node_instance_sub_workflows=node_instance_sub_workflows) + for node_instance in failing_node_instances: + node_instance_sub_workflow = task.WorkflowTask(install_node_instance, + node_instance=node_instance) + node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow + graph.add_tasks(node_instance_sub_workflow) + + # create dependencies between the node instance sub workflow + for node_instance in failing_node_instances: + node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] + if node_instance.relationship_instances: + dependencies = [node_instance_sub_workflows[relationship_instance.target_id] + for relationship_instance in node_instance.relationship_instances] + graph.add_dependency(node_instance_sub_workflow, dependencies) # Add operations for intact nodes depending on a node instance # belonging to node_instances @@ -141,35 +148,33 @@ def heal_install(context, graph, failing_node_instances, targeted_node_instances node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] for relationship_instance in node_instance.relationship_instances: - target_node_instance = context.storage.node_instance.get( - relationship_instance.target_id) - if target_node_instance in failing_node_instances: - after_tasks = [node_instance_sub_workflows[relationship.target_id] - for relationship in node_instance.relationship_instances] + target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id) + target_node_instance_subworkflow = node_instance_sub_workflows[target_node_instance.id] + graph.add_dependency(node_instance_sub_workflow, target_node_instance_subworkflow) - elif target_node_instance in targeted_node_instances: - after_tasks = [relationship_tasks( + if target_node_instance in failing_node_instances: + dependent = relationship_tasks( + graph=graph, node_instance=node_instance, relationship_instance=relationship_instance, - context=context, - operation_name='aria.interfaces.relationship_lifecycle.establish')] + context=ctx, + operation_name='aria.interfaces.relationship_lifecycle.establish') - if after_tasks: - graph.dependency(source_task=node_instance_sub_workflow, after=after_tasks) + graph.add_dependency(dependent, node_instance_sub_workflow) def _get_contained_subgraph(context, host_node_instance): - contained_instances = set(node_instance - for node_instance in context.node_instances - if node_instance.host_id == host_node_instance.id and - node_instance.id != node_instance.host_id) - result = {host_node_instance} + contained_instances = [node_instance + for node_instance in context.node_instances + if node_instance.host_id == host_node_instance.id and + node_instance.id != node_instance.host_id] + result = [host_node_instance] if not contained_instances: return result - result.update(contained_instances) + result.extend(contained_instances) for node_instance in contained_instances: - result.update(_get_contained_subgraph(context, node_instance)) + result.extend(_get_contained_subgraph(context, node_instance)) - return result + return set(result) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/builtin/install.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/install.py b/aria/workflows/builtin/install.py index 35d2968..0ab3ad6 100644 --- a/aria/workflows/builtin/install.py +++ b/aria/workflows/builtin/install.py @@ -20,13 +20,14 @@ Builtin install workflow from aria import workflow from .workflows import install_node_instance +from ..api import task @workflow -def install(context, graph, node_instances=(), node_instance_sub_workflows=None): +def install(ctx, graph, node_instances=(), node_instance_sub_workflows=None): """ The install workflow - :param WorkflowContext context: the workflow context + :param WorkflowContext ctx: the workflow context :param TaskGraph graph: the graph which will describe the workflow. :param node_instances: the node instances on which to run the workflow :param dict node_instance_sub_workflows: a dictionary of subworkflows with id as key and @@ -34,21 +35,19 @@ def install(context, graph, node_instances=(), node_instance_sub_workflows=None) :return: """ node_instance_sub_workflows = node_instance_sub_workflows or {} - node_instances = node_instances or list(context.node_instances) + node_instances = node_instances or list(ctx.node_instances) # create install sub workflow for every node instance for node_instance in node_instances: - node_instance_sub_workflow = install_node_instance( - context=context, - node_instance=node_instance) + node_instance_sub_workflow = task.WorkflowTask(install_node_instance, + node_instance=node_instance) node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow - graph.add_task(node_instance_sub_workflow) + graph.add_tasks(node_instance_sub_workflow) # create dependencies between the node instance sub workflow for node_instance in node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - graph.dependency( - source_task=node_instance_sub_workflow, - after=[ - node_instance_sub_workflows[relationship.target_id] - for relationship in node_instance.relationship_instances]) + if node_instance.relationship_instances: + dependencies = [node_instance_sub_workflows[relationship_instance.target_id] + for relationship_instance in node_instance.relationship_instances] + graph.add_dependency(node_instance_sub_workflow, dependencies) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/builtin/uninstall.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/uninstall.py b/aria/workflows/builtin/uninstall.py index 47c8259..f4e965c 100644 --- a/aria/workflows/builtin/uninstall.py +++ b/aria/workflows/builtin/uninstall.py @@ -20,10 +20,11 @@ Builtin uninstall workflow from aria import workflow from .workflows import uninstall_node_instance +from ..api import task @workflow -def uninstall(context, graph, node_instances=(), node_instance_sub_workflows=None): +def uninstall(ctx, graph, node_instances=(), node_instance_sub_workflows=None): """ The uninstall workflow :param WorkflowContext context: the workflow context @@ -34,20 +35,18 @@ def uninstall(context, graph, node_instances=(), node_instance_sub_workflows=Non :return: """ node_instance_sub_workflows = node_instance_sub_workflows or {} - node_instances = node_instances or list(context.node_instances) + node_instances = node_instances or list(ctx.node_instances) # create install sub workflow for every node instance for node_instance in node_instances: - node_instance_sub_workflow = uninstall_node_instance( - context=context, - node_instance=node_instance) + node_instance_sub_workflow = task.WorkflowTask(uninstall_node_instance, + node_instance=node_instance) node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow - graph.add_task(node_instance_sub_workflow) + graph.add_tasks(node_instance_sub_workflow) # create dependencies between the node instance sub workflow for node_instance in node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] for relationship_instance in reversed(node_instance.relationship_instances): - graph.dependency( - source_task=node_instance_sub_workflows[relationship_instance.target_id], - after=[node_instance_sub_workflow]) + graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id], + node_instance_sub_workflow)
