http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/storage/drivers.py ---------------------------------------------------------------------- diff --git a/aria/storage/drivers.py b/aria/storage/drivers.py index 8b7d3af..1f96956 100644 --- a/aria/storage/drivers.py +++ b/aria/storage/drivers.py @@ -27,18 +27,16 @@ classes: * FileSystemResourceDriver - file system implementation for resource storage driver. """ +import distutils.dir_util # pylint: disable=no-name-in-module, import-error import os import shutil -# pylint has an issue with distutils and virtualenvs: https://github.com/PyCQA/pylint/issues/73 -import distutils.dir_util # pylint: disable=no-name-in-module, import-error from functools import partial from multiprocessing import RLock import jsonpickle -from ..exceptions import StorageError from ..logger import LoggerMixin - +from .exceptions import StorageError __all__ = ( 'ModelDriver',
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/storage/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/storage/exceptions.py b/aria/storage/exceptions.py new file mode 100644 index 0000000..22dfc50 --- /dev/null +++ b/aria/storage/exceptions.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .. import exceptions + + +class StorageError(exceptions.AriaError): + """ + General storage exception + """ + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/storage/structures.py ---------------------------------------------------------------------- diff --git a/aria/storage/structures.py b/aria/storage/structures.py index 399922e..a833d99 100644 --- a/aria/storage/structures.py +++ b/aria/storage/structures.py @@ -27,14 +27,13 @@ classes: * Model - abstract model implementation. """ import json -from uuid import uuid4 from itertools import count +from uuid import uuid4 +from .exceptions import StorageError from ..logger import LoggerMixin -from ..exceptions import StorageError from ..tools.validation import ValidatorMixin - __all__ = ( 'uuid_generator', 'Field', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/tools/__init__.py ---------------------------------------------------------------------- diff --git a/aria/tools/__init__.py b/aria/tools/__init__.py index ae1e83e..320b445 100644 --- a/aria/tools/__init__.py +++ b/aria/tools/__init__.py @@ -12,3 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from .lru_cache import lru_cache +from .module import load_attribute +from .plugin import plugin_installer +from .process import Process +from .validation import validate_function_arguments, ValidatorMixin http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/tools/application.py ---------------------------------------------------------------------- diff --git a/aria/tools/application.py b/aria/tools/application.py index 360ba33..b1a7fcc 100644 --- a/aria/tools/application.py +++ b/aria/tools/application.py @@ -18,15 +18,15 @@ Convenience storage related tools. # TODO rename module name """ -import os import json +import os import shutil import tarfile import tempfile from datetime import datetime +from aria.storage.exceptions import StorageError from aria.logger import LoggerMixin -from aria.exceptions import StorageError class StorageManager(LoggerMixin): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/tools/process.py ---------------------------------------------------------------------- diff --git a/aria/tools/process.py b/aria/tools/process.py index 5a3d8a0..b9586b6 100644 --- a/aria/tools/process.py +++ b/aria/tools/process.py @@ -23,7 +23,7 @@ from signal import SIGKILL from time import sleep from aria.logger import LoggerMixin -from aria.exceptions import ExecutorException, ProcessException +from aria.orchestrator.workflows.exceptions import ExecutorException, ProcessException class Process(LoggerMixin): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/__init__.py b/aria/workflows/__init__.py deleted file mode 100644 index ae1e83e..0000000 --- a/aria/workflows/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/api/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/__init__.py b/aria/workflows/api/__init__.py deleted file mode 100644 index a3a17ee..0000000 --- a/aria/workflows/api/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Provides API for building tasks -""" - -from . import task, task_graph http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py deleted file mode 100644 index f6bf996..0000000 --- a/aria/workflows/api/task.py +++ /dev/null @@ -1,172 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Provides the tasks to be entered into the task graph -""" -from uuid import uuid4 - -from ... import ( - context, - storage, - exceptions, -) - - -class BaseTask(object): - """ - Abstract task_graph task - """ - def __init__(self, ctx=None, **kwargs): - if ctx is not None: - self._workflow_context = ctx - else: - self._workflow_context = context.workflow.current.get() - self._id = str(uuid4()) - - @property - def id(self): - """ - uuid4 generated id - :return: - """ - return self._id - - @property - def workflow_context(self): - """ - the context of the current workflow - :return: - """ - return self._workflow_context - - -class OperationTask(BaseTask): - """ - Represents an operation task in the task_graph - """ - - SOURCE_OPERATION = 'source_operations' - TARGET_OPERATION = 'target_operations' - - def __init__(self, - name, - actor, - operation_mapping, - max_attempts=None, - retry_interval=None, - ignore_failure=None, - inputs=None): - """ - Creates an operation task using the name, details, node instance and any additional kwargs. - :param name: the operation of the name. - :param operation_details: the details for the operation. - :param actor: the operation host on which this operation is registered. - :param inputs: operation inputs. - """ - assert isinstance(actor, (storage.models.NodeInstance, - storage.models.RelationshipInstance)) - super(OperationTask, self).__init__() - self.actor = actor - self.name = '{name}.{actor.id}'.format(name=name, actor=actor) - self.operation_mapping = operation_mapping - self.inputs = inputs or {} - self.max_attempts = (self.workflow_context._task_max_attempts - if max_attempts is None else max_attempts) - self.retry_interval = (self.workflow_context._task_retry_interval - if retry_interval is None else retry_interval) - self.ignore_failure = (self.workflow_context._task_ignore_failure - if ignore_failure is None else ignore_failure) - - @classmethod - def node_instance(cls, instance, name, inputs=None, *args, **kwargs): - """ - Represents a node based operation - - :param instance: the node of which this operation belongs to. - :param name: the name of the operation. - """ - assert isinstance(instance, storage.models.NodeInstance) - operation_details = instance.node.operations[name] - operation_inputs = operation_details.get('inputs', {}) - operation_inputs.update(inputs or {}) - return cls(name=name, - actor=instance, - operation_mapping=operation_details.get('operation', ''), - inputs=operation_inputs, - *args, - **kwargs) - - @classmethod - def relationship_instance(cls, instance, name, operation_end, inputs=None, *args, **kwargs): - """ - Represents a relationship based operation - - :param instance: the relationship of which this operation belongs to. - :param name: the name of the operation. - :param operation_end: source or target end of the relationship, this corresponds directly - with 'source_operations' and 'target_operations' - :param inputs any additional inputs to the operation - """ - assert isinstance(instance, storage.models.RelationshipInstance) - if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]: - raise exceptions.TaskException('The operation end should be {0} or {1}'.format( - cls.TARGET_OPERATION, cls.SOURCE_OPERATION - )) - operation_details = getattr(instance.relationship, operation_end)[name] - operation_inputs = operation_details.get('inputs', {}) - operation_inputs.update(inputs or {}) - return cls(actor=instance, - name=name, - operation_mapping=operation_details.get('operation'), - inputs=operation_inputs, - *args, - **kwargs) - - -class WorkflowTask(BaseTask): - """ - Represents an workflow task in the task_graph - """ - def __init__(self, workflow_func, **kwargs): - """ - Creates a workflow based task using the workflow_func provided, and its kwargs - :param workflow_func: the function to run - :param kwargs: the kwargs that would be passed to the workflow_func - """ - super(WorkflowTask, self).__init__(**kwargs) - kwargs['ctx'] = self.workflow_context - self._graph = workflow_func(**kwargs) - - @property - def graph(self): - """ - The graph constructed by the sub workflow - :return: - """ - return self._graph - - def __getattr__(self, item): - try: - return getattr(self._graph, item) - except AttributeError: - return super(WorkflowTask, self).__getattribute__(item) - - -class StubTask(BaseTask): - """ - Enables creating empty tasks. - """ - pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/api/task_graph.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/task_graph.py b/aria/workflows/api/task_graph.py deleted file mode 100644 index c88d343..0000000 --- a/aria/workflows/api/task_graph.py +++ /dev/null @@ -1,290 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Task graph. Used by users to build workflows -""" - -from uuid import uuid4 -from collections import Iterable - -from networkx import DiGraph, topological_sort - -from . import task as api_task - - -class TaskNotInGraphError(Exception): - """ - An error representing a scenario where a given task is not in the graph as expected - """ - pass - - -def _filter_out_empty_tasks(func=None): - if func is None: - return lambda f: _filter_out_empty_tasks(func=f) - - def _wrapper(task, *tasks, **kwargs): - return func(*(t for t in [task] + list(tasks) if t), **kwargs) - return _wrapper - - -class TaskGraph(object): - """ - A tasks graph builder. - Build an operations flow graph - """ - - def __init__(self, name): - self.name = name - self._id = str(uuid4()) - self._graph = DiGraph() - - def __repr__(self): - return '{name}(id={self._id}, name={self.name}, graph={self._graph!r})'.format( - name=self.__class__.__name__, self=self) - - @property - def id(self): - """ - Represents the id of the graph - :return: graph id - """ - return self._id - - # graph traversal methods - - @property - def tasks(self): - """ - An iterator on tasks added to the graph - :yields: Iterator over all tasks in the graph - """ - for _, data in self._graph.nodes_iter(data=True): - yield data['task'] - - def topological_order(self, reverse=False): - """ - Returns topological sort on the graph - :param reverse: whether to reverse the sort - :return: a list which represents the topological sort - """ - for task_id in topological_sort(self._graph, reverse=reverse): - yield self.get_task(task_id) - - def get_dependencies(self, dependent_task): - """ - Iterates over the task's dependencies - :param BaseTask dependent_task: The task whose dependencies are requested - :yields: Iterator over all tasks which dependency_task depends on - :raise: TaskNotInGraphError if dependent_task is not in the graph - """ - if not self.has_tasks(dependent_task): - raise TaskNotInGraphError('Task id: {0}'.format(dependent_task.id)) - for _, dependency_id in self._graph.out_edges_iter(dependent_task.id): - yield self.get_task(dependency_id) - - def get_dependents(self, dependency_task): - """ - Iterates over the task's dependents - :param BaseTask dependency_task: The task whose dependents are requested - :yields: Iterator over all tasks which depend on dependency_task - :raise: TaskNotInGraphError if dependency_task is not in the graph - """ - if not self.has_tasks(dependency_task): - raise TaskNotInGraphError('Task id: {0}'.format(dependency_task.id)) - for dependent_id, _ in self._graph.in_edges_iter(dependency_task.id): - yield self.get_task(dependent_id) - - # task methods - - def get_task(self, task_id): - """ - Get a task instance that's been inserted to the graph by the task's id - :param basestring task_id: The task's id - :return: Requested task - :rtype: BaseTask - :raise: TaskNotInGraphError if no task found in the graph with the given id - """ - if not self._graph.has_node(task_id): - raise TaskNotInGraphError('Task id: {0}'.format(task_id)) - data = self._graph.node[task_id] - return data['task'] - - @_filter_out_empty_tasks - def add_tasks(self, *tasks): - """ - Add a task to the graph - :param BaseTask task: The task - :return: A list of added tasks - :rtype: list - """ - assert all([isinstance(task, (api_task.BaseTask, Iterable)) for task in tasks]) - return_tasks = [] - - for task in tasks: - if isinstance(task, Iterable): - return_tasks += self.add_tasks(*task) - elif not self.has_tasks(task): - self._graph.add_node(task.id, task=task) - return_tasks.append(task) - - return return_tasks - - @_filter_out_empty_tasks - def remove_tasks(self, *tasks): - """ - Remove the provided task from the graph - :param BaseTask task: The task - :return: A list of removed tasks - :rtype: list - """ - return_tasks = [] - - for task in tasks: - if isinstance(task, Iterable): - return_tasks += self.remove_tasks(*task) - elif self.has_tasks(task): - self._graph.remove_node(task.id) - return_tasks.append(task) - - return return_tasks - - @_filter_out_empty_tasks - def has_tasks(self, *tasks): - """ - Check whether a task is in the graph or not - :param BaseTask task: The task - :return: True if all tasks are in the graph, otherwise True - :rtype: list - """ - assert all(isinstance(t, (api_task.BaseTask, Iterable)) for t in tasks) - return_value = True - - for task in tasks: - if isinstance(task, Iterable): - return_value &= self.has_tasks(*task) - else: - return_value &= self._graph.has_node(task.id) - - return return_value - - def add_dependency(self, dependent, dependency): - """ - Add a dependency for one item (task, sequence or parallel) on another - The dependent will only be executed after the dependency terminates - If either of the items is either a sequence or a parallel, - multiple dependencies may be added - :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) - :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) - :return: True if the dependency between the two hadn't already existed, otherwise False - :rtype: bool - :raise TaskNotInGraphError if either the dependent or dependency are tasks which - are not in the graph - """ - if not (self.has_tasks(dependent) and self.has_tasks(dependency)): - raise TaskNotInGraphError() - - if self.has_dependency(dependent, dependency): - return - - if isinstance(dependent, Iterable): - for dependent_task in dependent: - self.add_dependency(dependent_task, dependency) - else: - if isinstance(dependency, Iterable): - for dependency_task in dependency: - self.add_dependency(dependent, dependency_task) - else: - self._graph.add_edge(dependent.id, dependency.id) - - def has_dependency(self, dependent, dependency): - """ - Check whether one item (task, sequence or parallel) depends on another - - Note that if either of the items is either a sequence or a parallel, - and some of the dependencies exist in the graph but not all of them, - this method will return False - - :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) - :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) - :return: True if the dependency between the two exists, otherwise False - :rtype: bool - :raise TaskNotInGraphError if either the dependent or dependency are tasks - which are not in the graph - """ - if not (dependent and dependency): - return False - elif not (self.has_tasks(dependent) and self.has_tasks(dependency)): - raise TaskNotInGraphError() - - return_value = True - - if isinstance(dependent, Iterable): - for dependent_task in dependent: - return_value &= self.has_dependency(dependent_task, dependency) - else: - if isinstance(dependency, Iterable): - for dependency_task in dependency: - return_value &= self.has_dependency(dependent, dependency_task) - else: - return_value &= self._graph.has_edge(dependent.id, dependency.id) - - return return_value - - def remove_dependency(self, dependent, dependency): - """ - Remove a dependency for one item (task, sequence or parallel) on another - - Note that if either of the items is either a sequence or a parallel, and some of - the dependencies exist in the graph but not all of them, this method will not remove - any of the dependencies and return False - - :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) - :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) - :return: False if the dependency between the two hadn't existed, otherwise True - :rtype: bool - :raise TaskNotInGraphError if either the dependent or dependency are tasks - which are not in the graph - """ - if not (self.has_tasks(dependent) and self.has_tasks(dependency)): - raise TaskNotInGraphError() - - if not self.has_dependency(dependent, dependency): - return - - if isinstance(dependent, Iterable): - for dependent_task in dependent: - self.remove_dependency(dependent_task, dependency) - elif isinstance(dependency, Iterable): - for dependency_task in dependency: - self.remove_dependency(dependent, dependency_task) - else: - self._graph.remove_edge(dependent.id, dependency.id) - - @_filter_out_empty_tasks - def sequence(self, *tasks): - """ - Create and insert a sequence into the graph, effectively each task i depends on i-1 - :param tasks: an iterable of dependencies - :return: the provided tasks - """ - if tasks: - self.add_tasks(*tasks) - - for i in xrange(1, len(tasks)): - self.add_dependency(tasks[i], tasks[i-1]) - - return tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/builtin/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/__init__.py b/aria/workflows/builtin/__init__.py deleted file mode 100644 index 0449a8e..0000000 --- a/aria/workflows/builtin/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -A set of builtin workflows -""" - -from .install import install -from .uninstall import uninstall -from .execute_operation import execute_operation -from .heal import heal - - -__all__ = [ - 'install', - 'uninstall', - 'execute_operation', - 'heal', -] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/builtin/execute_operation.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/execute_operation.py b/aria/workflows/builtin/execute_operation.py deleted file mode 100644 index ddbb8e7..0000000 --- a/aria/workflows/builtin/execute_operation.py +++ /dev/null @@ -1,104 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Builtin execute_operation workflow -""" - -from aria import workflow - -from .workflows import execute_operation_on_instance - - -@workflow -def execute_operation( - ctx, - graph, - operation, - operation_kwargs, - allow_kwargs_override, - run_by_dependency_order, - type_names, - node_ids, - node_instance_ids, - **kwargs): - """ - The execute_operation workflow - - :param WorkflowContext workflow_context: the workflow context - :param TaskGraph graph: the graph which will describe the workflow. - :param basestring operation: the operation name to execute - :param dict operation_kwargs: - :param bool allow_kwargs_override: - :param bool run_by_dependency_order: - :param type_names: - :param node_ids: - :param node_instance_ids: - :param kwargs: - :return: - """ - subgraphs = {} - # filtering node instances - filtered_node_instances = list(_filter_node_instances( - context=ctx, - node_ids=node_ids, - node_instance_ids=node_instance_ids, - type_names=type_names)) - - if run_by_dependency_order: - filtered_node_instances_ids = set(node_instance.id - for node_instance in filtered_node_instances) - for node_instance in ctx.node_instances: - if node_instance.id not in filtered_node_instances_ids: - subgraphs[node_instance.id] = ctx.task_graph( - name='execute_operation_stub_{0}'.format(node_instance.id)) - - # registering actual tasks to sequences - for node_instance in filtered_node_instances: - graph.add_tasks( - execute_operation_on_instance( - node_instance=node_instance, - operation=operation, - operation_kwargs=operation_kwargs, - allow_kwargs_override=allow_kwargs_override - ) - ) - - for _, node_instance_sub_workflow in subgraphs.items(): - graph.add_tasks(node_instance_sub_workflow) - - # adding tasks dependencies if required - if run_by_dependency_order: - for node_instance in ctx.node_instances: - for relationship_instance in node_instance.relationship_instances: - graph.add_dependency(source_task=subgraphs[node_instance.id], - after=[subgraphs[relationship_instance.target_id]]) - - -def _filter_node_instances(context, node_ids=(), node_instance_ids=(), type_names=()): - def _is_node_by_id(node_id): - return not node_ids or node_id in node_ids - - def _is_node_instance_by_id(node_instance_id): - return not node_instance_ids or node_instance_id in node_instance_ids - - def _is_node_by_type(node_type_hierarchy): - return not type_names or node_type_hierarchy in type_names - - for node_instance in context.node_instances: - if all((_is_node_by_id(node_instance.node.id), - _is_node_instance_by_id(node_instance.id), - _is_node_by_type(node_instance.node.type_hierarchy))): - yield node_instance http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/heal.py b/aria/workflows/builtin/heal.py deleted file mode 100644 index dbfc14e..0000000 --- a/aria/workflows/builtin/heal.py +++ /dev/null @@ -1,174 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Builtin heal workflow -""" - -from aria import workflow - -from .workflows import relationship_tasks, install_node_instance, uninstall_node_instance -from ..api import task - - -@workflow -def heal(ctx, graph, node_instance_id): - """ - The heal workflow - - :param WorkflowContext ctx: the workflow context - :param TaskGraph graph: the graph which will describe the workflow. - :param node_instance_id: the id of the node instance to heal - :return: - """ - failing_node = ctx.model.node_instance.get(node_instance_id) - host_node = ctx.model.node_instance.get(failing_node.host_id) - failed_node_instance_subgraph = _get_contained_subgraph(ctx, host_node) - failed_node_instance_ids = list(n.id for n in failed_node_instance_subgraph) - - targeted_node_instances = [node_instance for node_instance in ctx.node_instances - if node_instance.id not in failed_node_instance_ids] - - uninstall_subgraph = task.WorkflowTask( - heal_uninstall, - failing_node_instances=failed_node_instance_subgraph, - targeted_node_instances=targeted_node_instances - ) - - install_subgraph = task.WorkflowTask( - heal_install, - failing_node_instances=failed_node_instance_subgraph, - targeted_node_instances=targeted_node_instances) - - graph.sequence(uninstall_subgraph, install_subgraph) - - -@workflow(suffix_template='{failing_node_instances}') -def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances): - """ - the uninstall part of the heal mechanism - :param WorkflowContext ctx: the workflow context - :param TaskGraph graph: the task graph to edit. - :param failing_node_instances: the failing nodes to heal. - :param targeted_node_instances: the targets of the relationships where the failing node are - source - :return: - """ - node_instance_sub_workflows = {} - - # Create install stub workflow for each unaffected node instance - for node_instance in targeted_node_instances: - node_instance_stub = task.StubTask() - node_instance_sub_workflows[node_instance.id] = node_instance_stub - graph.add_tasks(node_instance_stub) - - # create install sub workflow for every node instance - for node_instance in failing_node_instances: - node_instance_sub_workflow = task.WorkflowTask(uninstall_node_instance, - node_instance=node_instance) - node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow - graph.add_tasks(node_instance_sub_workflow) - - # create dependencies between the node instance sub workflow - for node_instance in failing_node_instances: - node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - for relationship_instance in reversed(node_instance.relationship_instances): - graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id], - node_instance_sub_workflow) - - # Add operations for intact nodes depending on a node instance belonging to node_instances - for node_instance in targeted_node_instances: - node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - - for relationship_instance in reversed(node_instance.relationship_instances): - target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id) - target_node_instance_subgraph = node_instance_sub_workflows[target_node_instance.id] - graph.add_dependency(target_node_instance_subgraph, node_instance_sub_workflow) - - if target_node_instance in failing_node_instances: - dependency = relationship_tasks( - relationship_instance=relationship_instance, - operation_name='aria.interfaces.relationship_lifecycle.unlink') - graph.add_tasks(*dependency) - graph.add_dependency(node_instance_sub_workflow, dependency) - - -@workflow(suffix_template='{failing_node_instances}') -def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): - """ - the install part of the heal mechanism - :param WorkflowContext ctx: the workflow context - :param TaskGraph graph: the task graph to edit. - :param failing_node_instances: the failing nodes to heal. - :param targeted_node_instances: the targets of the relationships where the failing node are - source - :return: - """ - node_instance_sub_workflows = {} - - # Create install sub workflow for each unaffected - for node_instance in targeted_node_instances: - node_instance_stub = task.StubTask() - node_instance_sub_workflows[node_instance.id] = node_instance_stub - graph.add_tasks(node_instance_stub) - - # create install sub workflow for every node instance - for node_instance in failing_node_instances: - node_instance_sub_workflow = task.WorkflowTask(install_node_instance, - node_instance=node_instance) - node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow - graph.add_tasks(node_instance_sub_workflow) - - # create dependencies between the node instance sub workflow - for node_instance in failing_node_instances: - node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - if node_instance.relationship_instances: - dependencies = [node_instance_sub_workflows[relationship_instance.target_id] - for relationship_instance in node_instance.relationship_instances] - graph.add_dependency(node_instance_sub_workflow, dependencies) - - # Add operations for intact nodes depending on a node instance - # belonging to node_instances - for node_instance in targeted_node_instances: - node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - - for relationship_instance in node_instance.relationship_instances: - target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id) - target_node_instance_subworkflow = node_instance_sub_workflows[target_node_instance.id] - graph.add_dependency(node_instance_sub_workflow, target_node_instance_subworkflow) - - if target_node_instance in failing_node_instances: - dependent = relationship_tasks( - relationship_instance=relationship_instance, - operation_name='aria.interfaces.relationship_lifecycle.establish') - graph.add_tasks(*dependent) - graph.add_dependency(dependent, node_instance_sub_workflow) - - -def _get_contained_subgraph(context, host_node_instance): - contained_instances = [node_instance - for node_instance in context.node_instances - if node_instance.host_id == host_node_instance.id and - node_instance.id != node_instance.host_id] - result = [host_node_instance] - - if not contained_instances: - return result - - result.extend(contained_instances) - for node_instance in contained_instances: - result.extend(_get_contained_subgraph(context, node_instance)) - - return set(result) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/builtin/install.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/install.py b/aria/workflows/builtin/install.py deleted file mode 100644 index 0ab3ad6..0000000 --- a/aria/workflows/builtin/install.py +++ /dev/null @@ -1,53 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Builtin install workflow -""" - -from aria import workflow - -from .workflows import install_node_instance -from ..api import task - - -@workflow -def install(ctx, graph, node_instances=(), node_instance_sub_workflows=None): - """ - The install workflow - :param WorkflowContext ctx: the workflow context - :param TaskGraph graph: the graph which will describe the workflow. - :param node_instances: the node instances on which to run the workflow - :param dict node_instance_sub_workflows: a dictionary of subworkflows with id as key and - TaskGraph (or OperationContext) as value - :return: - """ - node_instance_sub_workflows = node_instance_sub_workflows or {} - node_instances = node_instances or list(ctx.node_instances) - - # create install sub workflow for every node instance - for node_instance in node_instances: - node_instance_sub_workflow = task.WorkflowTask(install_node_instance, - node_instance=node_instance) - node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow - graph.add_tasks(node_instance_sub_workflow) - - # create dependencies between the node instance sub workflow - for node_instance in node_instances: - node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - if node_instance.relationship_instances: - dependencies = [node_instance_sub_workflows[relationship_instance.target_id] - for relationship_instance in node_instance.relationship_instances] - graph.add_dependency(node_instance_sub_workflow, dependencies) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/builtin/uninstall.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/uninstall.py b/aria/workflows/builtin/uninstall.py deleted file mode 100644 index f4e965c..0000000 --- a/aria/workflows/builtin/uninstall.py +++ /dev/null @@ -1,52 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Builtin uninstall workflow -""" - -from aria import workflow - -from .workflows import uninstall_node_instance -from ..api import task - - -@workflow -def uninstall(ctx, graph, node_instances=(), node_instance_sub_workflows=None): - """ - The uninstall workflow - :param WorkflowContext context: the workflow context - :param TaskGraph graph: the graph which will describe the workflow. - :param node_instances: the node instances on which to run the workflow - :param dict node_instance_sub_workflows: a dictionary of subworkflows with id as key and - TaskGraph (or OperationContext) as value - :return: - """ - node_instance_sub_workflows = node_instance_sub_workflows or {} - node_instances = node_instances or list(ctx.node_instances) - - # create install sub workflow for every node instance - for node_instance in node_instances: - node_instance_sub_workflow = task.WorkflowTask(uninstall_node_instance, - node_instance=node_instance) - node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow - graph.add_tasks(node_instance_sub_workflow) - - # create dependencies between the node instance sub workflow - for node_instance in node_instances: - node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - for relationship_instance in reversed(node_instance.relationship_instances): - graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id], - node_instance_sub_workflow) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/workflows.py b/aria/workflows/builtin/workflows.py deleted file mode 100644 index 0eb8c34..0000000 --- a/aria/workflows/builtin/workflows.py +++ /dev/null @@ -1,215 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -A set of builtin workflows. -""" - -from itertools import groupby - -from aria import workflow - -from ..api import task - - -__all__ = ( - 'install_node_instance', - 'uninstall_node_instance', - 'execute_operation_on_instance', -) - - -# Install node instance workflow and sub workflows - -@workflow(suffix_template='{node_instance.id}') -def install_node_instance(graph, node_instance, **kwargs): - """ - A workflow which installs a node instance. - :param WorkflowContext ctx: the workflow context - :param TaskGraph graph: the tasks graph of which to edit - :param node_instance: the node instance to install - :return: - """ - create_node_instance = task.OperationTask.node_instance( - instance=node_instance, - name='aria.interfaces.lifecycle.create') - - configure_node_instance = task.OperationTask.node_instance( - instance=node_instance, - name='aria.interfaces.lifecycle.configure') - start_node_instance = task.OperationTask.node_instance( - instance=node_instance, - name='aria.interfaces.lifecycle.start') - - graph.sequence( - create_node_instance, - preconfigure_relationship(graph, node_instance), - configure_node_instance, - postconfigure_relationship(graph, node_instance), - start_node_instance, - establish_relationship(graph, node_instance) - ) - - return graph - - -def preconfigure_relationship(graph, node_instance, **kwargs): - """ - - :param context: - :param graph: - :param node_instance: - :return: - """ - return relationships_tasks( - graph=graph, - operation_name='aria.interfaces.relationship_lifecycle.preconfigure', - node_instance=node_instance) - - -def postconfigure_relationship(graph, node_instance, **kwargs): - """ - - :param context: - :param graph: - :param node_instance: - :return: - """ - return relationships_tasks( - graph=graph, - operation_name='aria.interfaces.relationship_lifecycle.postconfigure', - node_instance=node_instance) - - -def establish_relationship(graph, node_instance, **kwargs): - """ - - :param context: - :param graph: - :param node_instance: - :return: - """ - return relationships_tasks( - graph=graph, - operation_name='aria.interfaces.relationship_lifecycle.establish', - node_instance=node_instance) - - -# Uninstall node instance workflow and subworkflows - -@workflow(suffix_template='{node_instance.id}') -def uninstall_node_instance(graph, node_instance, **kwargs): - """ - A workflow which uninstalls a node instance. - :param WorkflowContext context: the workflow context - :param TaskGraph graph: the tasks graph of which to edit - :param node_instance: the node instance to uninstall - :return: - """ - stop_node_instance = task.OperationTask.node_instance( - instance=node_instance, - name='aria.interfaces.lifecycle.stop') - delete_node_instance = task.OperationTask.node_instance( - instance=node_instance, - name='aria.interfaces.lifecycle.delete') - - graph.sequence( - stop_node_instance, - unlink_relationship(graph, node_instance), - delete_node_instance - ) - - -def unlink_relationship(graph, node_instance): - """ - - :param context: - :param graph: - :param node_instance: - :return: - """ - return relationships_tasks( - graph=graph, - operation_name='aria.interfaces.relationship_lifecycle.unlink', - node_instance=node_instance - ) - - -def execute_operation_on_instance( - node_instance, - operation, - operation_kwargs, - allow_kwargs_override): - """ - A workflow which executes a single operation - :param node_instance: the node instance to install - :param basestring operation: the operation name - :param dict operation_kwargs: - :param bool allow_kwargs_override: - :return: - """ - - if allow_kwargs_override is not None: - operation_kwargs['allow_kwargs_override'] = allow_kwargs_override - - return task.OperationTask.node_instance( - instance=node_instance, - name=operation, - inputs=operation_kwargs) - - -def relationships_tasks(graph, operation_name, node_instance): - """ - Creates a relationship task (source and target) for all of a node_instance relationships. - :param basestring operation_name: the relationship operation name. - :param WorkflowContext context: - :param NodeInstance node_instance: - :return: - """ - relationships_groups = groupby( - node_instance.relationship_instances, - key=lambda relationship_instance: relationship_instance.relationship.target_id) - - sub_tasks = [] - for _, (_, relationship_group) in enumerate(relationships_groups): - for relationship_instance in relationship_group: - relationship_operations = relationship_tasks( - relationship_instance=relationship_instance, - operation_name=operation_name) - sub_tasks.append(relationship_operations) - - return graph.sequence(*sub_tasks) - - -def relationship_tasks(relationship_instance, operation_name): - """ - Creates a relationship task source and target. - :param NodeInstance node_instance: the node instance of the relationship - :param RelationshipInstance relationship_instance: the relationship instance itself - :param WorkflowContext context: - :param operation_name: - :param index: the relationship index - enables pretty print - :return: - """ - source_operation = task.OperationTask.relationship_instance( - instance=relationship_instance, - name=operation_name, - operation_end=task.OperationTask.SOURCE_OPERATION) - target_operation = task.OperationTask.relationship_instance( - instance=relationship_instance, - name=operation_name, - operation_end=task.OperationTask.TARGET_OPERATION) - - return source_operation, target_operation http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/core/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/__init__.py b/aria/workflows/core/__init__.py deleted file mode 100644 index e377153..0000000 --- a/aria/workflows/core/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Core for the workflow execution mechanism -""" - -from . import task, translation, engine http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py deleted file mode 100644 index 83ad097..0000000 --- a/aria/workflows/core/engine.py +++ /dev/null @@ -1,114 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -The workflow engine. Executes workflows -""" - -import time -from datetime import datetime - -import networkx - -from ... import events, logger -from ...storage import models -from .. import exceptions -from . import translation -from . import task as engine_task - - -class Engine(logger.LoggerMixin): - """ - The workflow engine. Executes workflows - """ - - def __init__(self, executor, workflow_context, tasks_graph, **kwargs): - super(Engine, self).__init__(**kwargs) - self._workflow_context = workflow_context - self._execution_graph = networkx.DiGraph() - self._executor = executor - translation.build_execution_graph(task_graph=tasks_graph, - execution_graph=self._execution_graph) - - def execute(self): - """ - execute the workflow - """ - try: - events.start_workflow_signal.send(self._workflow_context) - cancel = False - while True: - cancel = self._is_cancel() - if cancel: - break - for task in self._ended_tasks(): - self._handle_ended_tasks(task) - for task in self._executable_tasks(): - self._handle_executable_task(task) - if self._all_tasks_consumed(): - break - else: - time.sleep(0.1) - if cancel: - events.on_cancelled_workflow_signal.send(self._workflow_context) - else: - events.on_success_workflow_signal.send(self._workflow_context) - except BaseException as e: - events.on_failure_workflow_signal.send(self._workflow_context, exception=e) - raise - - def cancel_execution(self): - """ - Send a cancel request to the engine. If execution already started, execution status - will be modified to 'cancelling' status. If execution is in pending mode, execution status - will be modified to 'cancelled' directly. - """ - events.on_cancelling_workflow_signal.send(self._workflow_context) - - def _is_cancel(self): - return self._workflow_context.execution.status in [models.Execution.CANCELLING, - models.Execution.CANCELLED] - - def _executable_tasks(self): - now = datetime.utcnow() - return (task for task in self._tasks_iter() - if task.status in models.Task.WAIT_STATES and - task.due_at <= now and - not self._task_has_dependencies(task)) - - def _ended_tasks(self): - return (task for task in self._tasks_iter() if task.status in models.Task.END_STATES) - - def _task_has_dependencies(self, task): - return len(self._execution_graph.pred.get(task.id, {})) > 0 - - def _all_tasks_consumed(self): - return len(self._execution_graph.node) == 0 - - def _tasks_iter(self): - return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True)) - - def _handle_executable_task(self, task): - if isinstance(task, engine_task.StubTask): - task.status = models.Task.SUCCESS - else: - events.sent_task_signal.send(task) - self._executor.execute(task) - - def _handle_ended_tasks(self, task): - if task.status == models.Task.FAILED and not task.ignore_failure: - raise exceptions.ExecutorException('Workflow failed') - else: - self._execution_graph.remove_node(task.id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py deleted file mode 100644 index 9ab5697..0000000 --- a/aria/workflows/core/task.py +++ /dev/null @@ -1,242 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow tasks -""" -from contextlib import contextmanager -from datetime import datetime -from functools import ( - partial, - wraps, -) - -from ... import logger -from ...storage import models -from ...context import operation as operation_context -from .. import exceptions - - -def _locked(func=None): - if func is None: - return partial(_locked, func=_locked) - - @wraps(func) - def _wrapper(self, value, **kwargs): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") - return func(self, value, **kwargs) - return _wrapper - - -class BaseTask(logger.LoggerMixin): - """ - Base class for Task objects - """ - - def __init__(self, id, *args, **kwargs): - super(BaseTask, self).__init__(*args, **kwargs) - self._id = id - - @property - def id(self): - """ - :return: the task's id - """ - return self._id - - -class StubTask(BaseTask): - """ - Base stub task for all tasks that don't actually run anything - """ - - def __init__(self, *args, **kwargs): - super(StubTask, self).__init__(*args, **kwargs) - self.status = models.Task.PENDING - self.due_at = datetime.utcnow() - - -class StartWorkflowTask(StubTask): - """ - Tasks marking a workflow start - """ - pass - - -class EndWorkflowTask(StubTask): - """ - Tasks marking a workflow end - """ - pass - - -class StartSubWorkflowTask(StubTask): - """ - Tasks marking a subworkflow start - """ - pass - - -class EndSubWorkflowTask(StubTask): - """ - Tasks marking a subworkflow end - """ - pass - - -class OperationTask(BaseTask): - """ - Operation tasks - """ - - def __init__(self, api_task, *args, **kwargs): - super(OperationTask, self).__init__(id=api_task.id, **kwargs) - self._workflow_context = api_task._workflow_context - task_model = api_task._workflow_context.model.task.model_cls - operation_task = task_model( - id=api_task.id, - name=api_task.name, - operation_mapping=api_task.operation_mapping, - actor=api_task.actor, - inputs=api_task.inputs, - status=task_model.PENDING, - execution_id=self._workflow_context._execution_id, - max_attempts=api_task.max_attempts, - retry_interval=api_task.retry_interval, - ignore_failure=api_task.ignore_failure - ) - - if isinstance(api_task.actor, models.NodeInstance): - context_class = operation_context.NodeOperationContext - elif isinstance(api_task.actor, models.RelationshipInstance): - context_class = operation_context.RelationshipOperationContext - else: - raise RuntimeError('No operation context could be created for {0}' - .format(api_task.actor.model_cls)) - - self._ctx = context_class(name=api_task.name, - workflow_context=self._workflow_context, - task=operation_task) - self._workflow_context.model.task.store(operation_task) - self._task_id = operation_task.id - self._update_fields = None - - @contextmanager - def _update(self): - """ - A context manager which puts the task into update mode, enabling fields update. - :yields: None - """ - self._update_fields = {} - try: - yield - task = self.model_task - for key, value in self._update_fields.items(): - setattr(task, key, value) - self.model_task = task - finally: - self._update_fields = None - - @property - def model_task(self): - """ - Returns the task model in storage - :return: task in storage - """ - return self._workflow_context.model.task.get(self._task_id) - - @model_task.setter - def model_task(self, value): - self._workflow_context.model.task.store(value) - - @property - def context(self): - """ - Contexts for the operation - :return: - """ - return self._ctx - - @property - def status(self): - """ - Returns the task status - :return: task status - """ - return self.model_task.status - - @status.setter - @_locked - def status(self, value): - self._update_fields['status'] = value - - @property - def started_at(self): - """ - Returns when the task started - :return: when task started - """ - return self.model_task.started_at - - @started_at.setter - @_locked - def started_at(self, value): - self._update_fields['started_at'] = value - - @property - def ended_at(self): - """ - Returns when the task ended - :return: when task ended - """ - return self.model_task.ended_at - - @ended_at.setter - @_locked - def ended_at(self, value): - self._update_fields['ended_at'] = value - - @property - def retry_count(self): - """ - Returns the retry count for the task - :return: retry count - """ - return self.model_task.retry_count - - @retry_count.setter - @_locked - def retry_count(self, value): - self._update_fields['retry_count'] = value - - @property - def due_at(self): - """ - Returns the minimum datetime in which the task can be executed - :return: eta - """ - return self.model_task.due_at - - @due_at.setter - @_locked - def due_at(self, value): - self._update_fields['due_at'] = value - - def __getattr__(self, attr): - try: - return getattr(self.model_task, attr) - except AttributeError: - return super(OperationTask, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/translation.py b/aria/workflows/core/translation.py deleted file mode 100644 index b6cbdad..0000000 --- a/aria/workflows/core/translation.py +++ /dev/null @@ -1,106 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Translation of user graph's API to the execution graph -""" - -from .. import api -from . import task as core_task - - -def build_execution_graph( - task_graph, - execution_graph, - start_cls=core_task.StartWorkflowTask, - end_cls=core_task.EndWorkflowTask, - depends_on=()): - """ - Translates the user graph to the execution graph - :param task_graph: The user's graph - :param workflow_context: The workflow - :param execution_graph: The execution graph that is being built - :param start_cls: internal use - :param end_cls: internal use - :param depends_on: internal use - """ - # Insert start marker - start_task = start_cls(id=_start_graph_suffix(task_graph.id)) - _add_task_and_dependencies(execution_graph, start_task, depends_on) - - for api_task in task_graph.topological_order(reverse=True): - dependencies = task_graph.get_dependencies(api_task) - operation_dependencies = _get_tasks_from_dependencies( - execution_graph, - dependencies, - default=[start_task]) - - if isinstance(api_task, api.task.OperationTask): - # Add the task an the dependencies - operation_task = core_task.OperationTask(api_task) - _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) - elif isinstance(api_task, api.task.WorkflowTask): - # Build the graph recursively while adding start and end markers - build_execution_graph( - task_graph=api_task, - execution_graph=execution_graph, - start_cls=core_task.StartSubWorkflowTask, - end_cls=core_task.EndSubWorkflowTask, - depends_on=operation_dependencies - ) - elif isinstance(api_task, api.task.StubTask): - stub_task = core_task.StubTask(id=api_task.id) - _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies) - else: - raise RuntimeError('Undefined state') - - # Insert end marker - workflow_dependencies = _get_tasks_from_dependencies( - execution_graph, - _get_non_dependency_tasks(task_graph), - default=[start_task]) - end_task = end_cls(id=_end_graph_suffix(task_graph.id)) - _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies) - - -def _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies=()): - execution_graph.add_node(operation_task.id, task=operation_task) - for dependency in operation_dependencies: - execution_graph.add_edge(dependency.id, operation_task.id) - - -def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): - """ - Returns task list from dependencies. - """ - return [execution_graph.node[dependency.id - if isinstance(dependency, (api.task.OperationTask, - api.task.StubTask)) - else _end_graph_suffix(dependency.id)]['task'] - for dependency in dependencies] or default - - -def _start_graph_suffix(id): - return '{0}-Start'.format(id) - - -def _end_graph_suffix(id): - return '{0}-End'.format(id) - - -def _get_non_dependency_tasks(graph): - for task in graph.tasks: - if len(list(graph.get_dependents(task))) == 0: - yield task http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/workflows/exceptions.py b/aria/workflows/exceptions.py deleted file mode 100644 index d7b189d..0000000 --- a/aria/workflows/exceptions.py +++ /dev/null @@ -1,70 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow related Exception classes -""" - - -class ExecutorException(Exception): - """ - General executor exception - """ - pass - - -class ProcessException(ExecutorException): - """ - Raised when subprocess execution fails - """ - - def __init__(self, command, stderr=None, stdout=None, return_code=None): - """ - Process class Exception - :param list command: child process command - :param str message: custom message - :param str stderr: child process stderr - :param str stdout: child process stdout - :param int return_code: child process exit code - """ - super(ProcessException, self).__init__("child process failed") - self.command = command - self.stderr = stderr - self.stdout = stdout - self.return_code = return_code - - @property - def explanation(self): - """ - Describes the error in detail - """ - return ( - 'Command "{error.command}" executed with an error.\n' - 'code: {error.return_code}\n' - 'error: {error.stderr}\n' - 'output: {error.stdout}'.format(error=self)) - - -class AriaEngineError(Exception): - """ - Raised by the workflow engine - """ - - -class TaskException(Exception): - """ - Raised by the task - """ - pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/__init__.py b/aria/workflows/executor/__init__.py deleted file mode 100644 index 09fb12c..0000000 --- a/aria/workflows/executor/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Executors for task execution -""" - - -from . import blocking, celery, multiprocess, thread http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/base.py b/aria/workflows/executor/base.py deleted file mode 100644 index 118ab2b..0000000 --- a/aria/workflows/executor/base.py +++ /dev/null @@ -1,54 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Base executor module -""" - -from aria import events - - -class BaseExecutor(object): - """ - Base class for executors for running tasks - """ - - def __init__(self, *args, **kwargs): - pass - - def execute(self, task): - """ - Execute a task - :param task: task to execute - """ - raise NotImplementedError - - def close(self): - """ - Close the executor - """ - pass - - @staticmethod - def _task_started(task): - events.start_task_signal.send(task) - - @staticmethod - def _task_failed(task, exception): - events.on_failure_task_signal.send(task, exception=exception) - - @staticmethod - def _task_succeeded(task): - events.on_success_task_signal.send(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/executor/blocking.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/blocking.py b/aria/workflows/executor/blocking.py deleted file mode 100644 index 30bebbe..0000000 --- a/aria/workflows/executor/blocking.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Blocking executor -""" - -from aria.tools import module -from .base import BaseExecutor - - -class CurrentThreadBlockingExecutor(BaseExecutor): - """ - Executor which runs tasks in the current thread (blocking) - """ - - def execute(self, task): - self._task_started(task) - try: - task_func = module.load_attribute(task.operation_mapping) - task_func(ctx=task.context, **task.inputs) - self._task_succeeded(task) - except BaseException as e: - self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/celery.py b/aria/workflows/executor/celery.py deleted file mode 100644 index baa97bd..0000000 --- a/aria/workflows/executor/celery.py +++ /dev/null @@ -1,97 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Celery based executor -""" - -import threading -import Queue - -from .base import BaseExecutor - - -class CeleryExecutor(BaseExecutor): - """ - Executor which runs tasks using celery - """ - - def __init__(self, app, *args, **kwargs): - super(CeleryExecutor, self).__init__(*args, **kwargs) - self._app = app - self._started_signaled = False - self._started_queue = Queue.Queue(maxsize=1) - self._tasks = {} - self._results = {} - self._receiver = None - self._stopped = False - self._receiver_thread = threading.Thread(target=self._events_receiver) - self._receiver_thread.daemon = True - self._receiver_thread.start() - self._started_queue.get(timeout=30) - - def execute(self, task): - self._tasks[task.id] = task - inputs = task.inputs.copy() - inputs['ctx'] = task.context - self._results[task.id] = self._app.send_task( - task.operation_mapping, - kwargs=inputs, - task_id=task.id, - queue=self._get_queue(task)) - - def close(self): - self._stopped = True - if self._receiver: - self._receiver.should_stop = True - self._receiver_thread.join() - - @staticmethod - def _get_queue(task): - return None if task else None # TODO - - def _events_receiver(self): - with self._app.connection() as connection: - self._receiver = self._app.events.Receiver(connection, handlers={ - 'task-started': self._celery_task_started, - 'task-succeeded': self._celery_task_succeeded, - 'task-failed': self._celery_task_failed, - }) - for _ in self._receiver.itercapture(limit=None, timeout=None, wakeup=True): - if not self._started_signaled: - self._started_queue.put(True) - self._started_signaled = True - if self._stopped: - return - - def _celery_task_started(self, event): - self._task_started(self._tasks[event['uuid']]) - - def _celery_task_succeeded(self, event): - task, _ = self._remove_task(event['uuid']) - self._task_succeeded(task) - - def _celery_task_failed(self, event): - task, async_result = self._remove_task(event['uuid']) - try: - exception = async_result.result - except BaseException as e: - exception = RuntimeError( - 'Could not de-serialize exception of task {0} --> {1}: {2}' - .format(task.name, type(e).__name__, str(e))) - self._task_failed(task, exception=exception) - - def _remove_task(self, task_id): - return self._tasks.pop(task_id), self._results.pop(task_id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/executor/multiprocess.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/multiprocess.py b/aria/workflows/executor/multiprocess.py deleted file mode 100644 index 545fbf6..0000000 --- a/aria/workflows/executor/multiprocess.py +++ /dev/null @@ -1,98 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Multiprocess based executor -""" - -import threading -import multiprocessing - -import jsonpickle - -from aria.tools import module -from .base import BaseExecutor - - -class MultiprocessExecutor(BaseExecutor): - """ - Executor which runs tasks in a multiprocess environment - """ - - def __init__(self, pool_size=1, *args, **kwargs): - super(MultiprocessExecutor, self).__init__(*args, **kwargs) - self._stopped = False - self._manager = multiprocessing.Manager() - self._queue = self._manager.Queue() - self._tasks = {} - self._listener_thread = threading.Thread(target=self._listener) - self._listener_thread.daemon = True - self._listener_thread.start() - self._pool = multiprocessing.Pool(processes=pool_size) - - def execute(self, task): - self._tasks[task.id] = task - self._pool.apply_async(_multiprocess_handler, args=( - self._queue, - task.context, - task.id, - task.operation_mapping, - task.inputs)) - - def close(self): - self._pool.close() - self._stopped = True - self._pool.join() - self._listener_thread.join() - - def _listener(self): - while not self._stopped: - try: - message = self._queue.get(timeout=1) - if message.type == 'task_started': - self._task_started(self._tasks[message.task_id]) - elif message.type == 'task_succeeded': - self._task_succeeded(self._remove_task(message.task_id)) - elif message.type == 'task_failed': - self._task_failed(self._remove_task(message.task_id), - exception=jsonpickle.loads(message.exception)) - else: - # TODO: something - raise RuntimeError() - # Daemon threads - except BaseException: - pass - - def _remove_task(self, task_id): - return self._tasks.pop(task_id) - - -class _MultiprocessMessage(object): - - def __init__(self, type, task_id, exception=None): - self.type = type - self.task_id = task_id - self.exception = exception - - -def _multiprocess_handler(queue, ctx, task_id, operation_mapping, operation_inputs): - queue.put(_MultiprocessMessage(type='task_started', task_id=task_id)) - try: - task_func = module.load_attribute(operation_mapping) - task_func(ctx=ctx, **operation_inputs) - queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id)) - except BaseException as e: - queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id, - exception=jsonpickle.dumps(e))) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/aria/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/thread.py b/aria/workflows/executor/thread.py deleted file mode 100644 index 6d29c1a..0000000 --- a/aria/workflows/executor/thread.py +++ /dev/null @@ -1,65 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Thread based executor -""" - -import threading -import Queue - -from aria.tools import module -from .base import BaseExecutor - - -class ThreadExecutor(BaseExecutor): - """ - Executor which runs tasks in a separate thread - """ - - def __init__(self, pool_size=1, *args, **kwargs): - super(ThreadExecutor, self).__init__(*args, **kwargs) - self._stopped = False - self._queue = Queue.Queue() - self._pool = [] - for i in range(pool_size): - name = 'ThreadExecutor-{index}'.format(index=i+1) - thread = threading.Thread(target=self._processor, name=name) - thread.daemon = True - thread.start() - self._pool.append(thread) - - def execute(self, task): - self._queue.put(task) - - def close(self): - self._stopped = True - for thread in self._pool: - thread.join() - - def _processor(self): - while not self._stopped: - try: - task = self._queue.get(timeout=1) - self._task_started(task) - try: - task_func = module.load_attribute(task.operation_mapping) - task_func(ctx=task.context, **task.inputs) - self._task_succeeded(task) - except BaseException as e: - self._task_failed(task, exception=e) - # Daemon threads - except BaseException: - pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/653365da/extensions/aria_extension_tosca/simple_v1_0/data_types.py ---------------------------------------------------------------------- diff --git a/extensions/aria_extension_tosca/simple_v1_0/data_types.py b/extensions/aria_extension_tosca/simple_v1_0/data_types.py index 40e1e21..e9cd97c 100644 --- a/extensions/aria_extension_tosca/simple_v1_0/data_types.py +++ b/extensions/aria_extension_tosca/simple_v1_0/data_types.py @@ -14,7 +14,10 @@ # limitations under the License. import re -from collections import OrderedDict +try: + from collections import OrderedDict +except ImportError: + from ordereddict import OrderedDict from functools import total_ordering from datetime import datetime, tzinfo, timedelta