Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-138-Make-logging-more-informative b180485c4 -> 2f038276e (forced update)
ARIA-138-Make-logging-more-informative Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/2f038276 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/2f038276 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/2f038276 Branch: refs/heads/ARIA-138-Make-logging-more-informative Commit: 2f038276ea1aa01b5bdaf7e908aafe398f75b53d Parents: cf80675 Author: max-orlov <ma...@gigaspaces.com> Authored: Mon Apr 17 18:50:58 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Wed Apr 19 13:15:26 2017 +0300 ---------------------------------------------------------------------- aria/cli/commands/__init__.py | 1 + aria/cli/commands/executions.py | 12 +++- aria/cli/commands/logs.py | 24 ++++--- aria/cli/execution_logging.py | 66 ++++++++++++++++++++ aria/cli/logger.py | 18 ++++++ aria/logger.py | 4 +- aria/modeling/orchestration.py | 11 +++- aria/orchestrator/context/common.py | 39 ++++++------ aria/orchestrator/context/operation.py | 29 +-------- aria/orchestrator/context/workflow.py | 4 +- aria/orchestrator/workflow_runner.py | 6 +- aria/orchestrator/workflows/core/engine.py | 2 + aria/orchestrator/workflows/events_logging.py | 26 ++++---- aria/orchestrator/workflows/executor/base.py | 4 +- aria/orchestrator/workflows/executor/process.py | 22 ++++--- aria/orchestrator/workflows/executor/thread.py | 8 ++- tests/.pylintrc | 2 +- .../orchestrator/workflows/executor/__init__.py | 51 +++++++++++++++ .../workflows/executor/test_executor.py | 64 +++---------------- .../workflows/executor/test_process_executor.py | 37 +---------- 20 files changed, 246 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/cli/commands/__init__.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/__init__.py b/aria/cli/commands/__init__.py index 7777791..9ca5e47 100644 --- a/aria/cli/commands/__init__.py +++ b/aria/cli/commands/__init__.py @@ -1,3 +1,4 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index 730fd29..0783ad0 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -15,7 +15,9 @@ import os +from aria.cli import execution_logging from .. import utils +from .. import logger as cli_logger from ..table import print_data from ..cli import aria from ...modeling.models import Execution @@ -141,13 +143,19 @@ def start(workflow_name, logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) execution_thread.start() + + log_consumer = cli_logger.ModelLogConsumer(model_storage, workflow_runner.execution_id) try: while execution_thread.is_alive(): - # using join without a timeout blocks and ignores KeyboardInterrupt - execution_thread.join(1) + for log in log_consumer: + execution_logging.load(log).log() + except KeyboardInterrupt: _cancel_execution(workflow_runner, execution_thread, logger) + for log in log_consumer: + execution_logging.load(log).log() + # raise any errors from the execution thread (note these are not workflow execution errors) execution_thread.raise_error_if_exists() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/cli/commands/logs.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/logs.py b/aria/cli/commands/logs.py index f8873cd..e87ee3b 100644 --- a/aria/cli/commands/logs.py +++ b/aria/cli/commands/logs.py @@ -12,13 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from .. import utils +from ..logger import ModelLogConsumer from ..cli import aria +from .. import execution_logging @aria.group(name='logs') -@aria.options.verbose() def logs(): """Show logs from workflow executions """ @@ -31,19 +30,18 @@ def logs(): @aria.options.verbose() @aria.pass_model_storage @aria.pass_logger -def list(execution_id, - model_storage, - logger): +def list(execution_id, model_storage, logger): """Display logs for an execution """ logger.info('Listing logs for execution id {0}'.format(execution_id)) - logs_list = model_storage.log.list(filters=dict(execution_fk=execution_id), - sort=utils.storage_sort_param('created_at', False)) - # TODO: print logs nicely - if logs_list: - for log in logs_list: - print log - else: + log_consumer = ModelLogConsumer(model_storage, execution_id) + any_logs = False + + for log in log_consumer: + execution_logging.load(log).log() + any_logs = True + + if not any_logs: logger.info('\tNo logs') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/cli/execution_logging.py ---------------------------------------------------------------------- diff --git a/aria/cli/execution_logging.py b/aria/cli/execution_logging.py new file mode 100644 index 0000000..16a7d1a --- /dev/null +++ b/aria/cli/execution_logging.py @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from . import logger +from .env import env + +DEFAULT_FORMATTING = { + logger.NO_VERBOSE: {'main_msg': '{item.msg}'}, + logger.LOW_VERBOSE: { + 'main_msg': '{created_at} | {item.level[0]} | {item.msg}', + 'created_at': '%H:%M:%S' + } +} + + +class load(object): + + def __init__(self, item, formats=None): + self._item = item + self._formats = formats or DEFAULT_FORMATTING + + def __repr__(self): + # Only NO_VERBOSE and LOW_VERBOSE are configurable formats. configuring + # the low verbose level should affect any higher level. + formats = self._formats[min(env.logging.verbosity_level, logger.LOW_VERBOSE)] + + kwargs = dict(item=self._item) + if 'created_at' in formats: + kwargs['created_at'] = self._item.created_at.strftime(formats['created_at']) + if 'level' in formats: + kwargs['level'] = formats['level'].format(self._item.level) + if 'msg' in formats: + kwargs['msg'] = formats['msg'].format(self._item.msg) + + if 'actor' in formats and self._item.task: + kwargs['actor'] = formats['actor'].format(self._item.task.actor) + if 'execution' in formats: + kwargs['execution'] = formats['execution'].format(self._item.execution) + + # If no format was supplied just print out the original msg. + msg = formats.get('main_msg', '{item.msg}').format(**kwargs) + + # Add the exception and the error msg. + if self._item.traceback and env.logging.verbosity_level >= logger.MEDIUM_VERBOSE: + msg += os.linesep + '------>' + for line in self._item.traceback.splitlines(True): + msg += '\t' + '|' + line + + return msg + + def log(self, *args, **kwargs): + return getattr(env.logging.logger, self._item.level.lower())(self, *args, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/cli/logger.py ---------------------------------------------------------------------- diff --git a/aria/cli/logger.py b/aria/cli/logger.py index 2f012d9..6811f3a 100644 --- a/aria/cli/logger.py +++ b/aria/cli/logger.py @@ -112,3 +112,21 @@ class Logging(object): self._all_loggers.append(logger_name) dictconfig.dictConfig(logger_dict) + + +class ModelLogConsumer(object): + + def __init__(self, model_storage, execution_id, filters=None, sort=None): + self._last_visited_id = 0 + self._model_storage = model_storage + self._execution_id = execution_id + self._additional_filters = filters or {} + self._sort = sort or {} + + def __iter__(self): + filters = dict(execution_fk=self._execution_id, id=dict(gt=self._last_visited_id)) + filters.update(self._additional_filters) + + for log in self._model_storage.log.iter(filters=filters, sort=self._sort): + self._last_visited_id = log.id + yield log http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index dd54264..d6a06d0 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -177,10 +177,12 @@ class _SQLAlchemyHandler(logging.Handler): log = self._cls( execution_fk=self._execution_id, task_fk=record.task_id, - actor=record.prefix, level=record.levelname, msg=str(record.msg), created_at=created_at, + + # Not mandatory. + traceback=getattr(record, 'traceback', None) ) self._session.add(log) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index a2f041b..c2cf6b1 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -413,7 +413,9 @@ class LogBase(ModelMixin): level = Column(String) msg = Column(String) created_at = Column(DateTime, index=True) - actor = Column(String) + + # In case of failed execution + traceback = Column(Text) # region foreign keys @@ -427,6 +429,9 @@ class LogBase(ModelMixin): # endregion + def __str__(self): + return self.msg + def __repr__(self): - return "<{self.created_at}: [{self.level}] @{self.actor}> {msg}".format( - self=self, msg=self.msg[:50]) + name = (self.task.actor if self.task else self.execution).name + return '{name}: {self.msg}'.format(name=name, self=self) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 15843db..67dffcd 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -38,43 +38,44 @@ class BaseContext(object): """ class PrefixedLogger(object): - def __init__(self, logger, prefix='', task_id=None): - self._logger = logger - self._prefix = prefix + def __init__(self, base_logger, task_id=None): + self._logger = base_logger self._task_id = task_id def __getattr__(self, item): if item.upper() in logging._levelNames: - return partial(getattr(self._logger, item), - extra={'prefix': self._prefix, 'task_id': self._task_id}) + return partial(self._logger_with_task_id, _level=item) else: return getattr(self._logger, item) - def __init__( - self, - name, - service_id, - execution_id, - model_storage, - resource_storage, - workdir=None, - **kwargs): + def _logger_with_task_id(self, *args, **kwargs): + level = kwargs.pop('_level') + kwargs.setdefault('extra', {})['task_id'] = self._task_id + return getattr(self._logger, level)(*args, **kwargs) + + def __init__(self, + name, + service_id, + model_storage, + resource_storage, + execution_id, + workdir=None, + **kwargs): super(BaseContext, self).__init__(**kwargs) self._name = name self._id = generate_uuid(variant='uuid') self._model = model_storage self._resource = resource_storage self._service_id = service_id - self._execution_id = execution_id self._workdir = workdir + self._execution_id = execution_id self.logger = None def _register_logger(self, level=None, task_id=None): self.logger = self.PrefixedLogger( - logging.getLogger(aria_logger.TASK_LOGGER_NAME), self.logging_id, task_id=task_id) + logging.getLogger(aria_logger.TASK_LOGGER_NAME), task_id=task_id) self.logger.setLevel(level or logging.DEBUG) if not self.logger.handlers: - self.logger.addHandler(aria_logger.create_console_log_handler()) self.logger.addHandler(self._get_sqla_handler()) def _get_sqla_handler(self): @@ -104,10 +105,6 @@ class BaseContext(object): self.logger.removeHandler(handler) @property - def logging_id(self): - raise NotImplementedError - - @property def model(self): """ Access to the model storage http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index c7d8246..c383958 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -29,20 +29,8 @@ class BaseOperationContext(BaseContext): Context object used during operation creation and execution """ - def __init__(self, - name, - model_storage, - resource_storage, - service_id, - task_id, - actor_id, - **kwargs): - super(BaseOperationContext, self).__init__( - name=name, - model_storage=model_storage, - resource_storage=resource_storage, - service_id=service_id, - **kwargs) + def __init__(self, task_id, actor_id, **kwargs): + super(BaseOperationContext, self).__init__(**kwargs) self._task_id = task_id self._actor_id = actor_id self._thread_local = threading.local() @@ -55,10 +43,6 @@ class BaseOperationContext(BaseContext): return '{name}({0})'.format(details, name=self.name) @property - def logging_id(self): - raise NotImplementedError - - @property def task(self): """ The task in the model storage @@ -119,10 +103,6 @@ class NodeOperationContext(BaseOperationContext): """ @property - def logging_id(self): - return self.node.name or self.node.id - - @property def node_template(self): """ the node of the current operation @@ -145,11 +125,6 @@ class RelationshipOperationContext(BaseOperationContext): """ @property - def logging_id(self): - return '{0}->{1}'.format(self.source_node.name or self.source_node.id, - self.target_node.name or self.target_node.id) - - @property def source_node_template(self): """ The source node http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 667d22f..920b237 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -50,8 +50,8 @@ class WorkflowContext(BaseContext): name=self.__class__.__name__, self=self)) @property - def logging_id(self): - return '{0}[{1}]'.format(self._workflow_name, self._execution_id) + def workflow_name(self): + return self._workflow_name @property def execution(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 7b043c5..3148a94 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -90,8 +90,12 @@ class WorkflowRunner(object): tasks_graph=self._tasks_graph) @property + def execution_id(self): + return self._execution_id + + @property def execution(self): - return self._model_storage.execution.get(self._execution_id) + return self._model_storage.execution.get(self.execution_id) @property def service(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 155d0ee..a111247 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -20,6 +20,7 @@ The workflow engine. Executes workflows import time from datetime import datetime +import logging import networkx from aria import logger @@ -40,6 +41,7 @@ class Engine(logger.LoggerMixin): def __init__(self, executor, workflow_context, tasks_graph, **kwargs): super(Engine, self).__init__(**kwargs) + self.logger.addHandler(logging.NullHandler()) self._workflow_context = workflow_context self._execution_graph = networkx.DiGraph() self._executor = executor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index e831bfe..1bc5a1e 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -26,41 +26,43 @@ from .. import events @events.start_task_signal.connect def _start_task_handler(task, **kwargs): - task.context.logger.debug('Event: Starting task: {task.name}'.format(task=task)) + task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} started...' + .format(actor=task.actor, task=task)) @events.on_success_task_signal.connect def _success_task_handler(task, **kwargs): - task.context.logger.debug('Event: Task success: {task.name}'.format(task=task)) + task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} successful' + .format(actor=task.actor, task=task)) @events.on_failure_task_signal.connect -def _failure_operation_handler(task, exception, **kwargs): - error = '{0}: {1}'.format(type(exception).__name__, exception) - task.context.logger.error('Event: Task failure: {task.name} [{error}]'.format( - task=task, error=error)) - +def _failure_operation_handler(task, traceback, **kwargs): + task.context.logger.error( + '{actor.name} {task.interface_name}.{task.operation_name} failed' + .format(actor=task.actor, task=task), extra=dict(traceback=traceback) + ) @events.start_workflow_signal.connect def _start_workflow_handler(context, **kwargs): - context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context)) + context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context)) @events.on_failure_workflow_signal.connect def _failure_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context)) @events.on_success_workflow_signal.connect def _success_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow success: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context)) @events.on_cancelled_workflow_signal.connect def _cancel_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context)) @events.on_cancelling_workflow_signal.connect def _cancelling_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context)) + context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 4ae046d..39becef 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -44,8 +44,8 @@ class BaseExecutor(logger.LoggerMixin): events.start_task_signal.send(task) @staticmethod - def _task_failed(task, exception): - events.on_failure_task_signal.send(task, exception=exception) + def _task_failed(task, exception, traceback=None): + events.on_failure_task_signal.send(task, exception=exception, traceback=traceback) @staticmethod def _task_succeeded(task): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 3c2b5fe..f744f54 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -42,13 +42,17 @@ import pickle import jsonpickle import aria -from aria.extension import process_executor -from aria.utils import imports -from aria.utils import exceptions from aria.orchestrator.workflows.executor import base from aria.storage import instrumentation -from aria.modeling import types as modeling_types -from aria.modeling.models import Parameter +from aria.extension import process_executor +from aria.utils import ( + imports, + exceptions +) +from aria.modeling import ( + types as modeling_types, + models +) _IS_WIN = os.name == 'nt' @@ -149,7 +153,7 @@ class ProcessExecutor(base.BaseExecutor): return { 'task_id': task.id, 'implementation': task.implementation, - 'operation_inputs': Parameter.unwrap_dict(task.inputs), + 'operation_inputs': models.Parameter.unwrap_dict(task.inputs), 'port': self._server_port, 'context': task.context.serialization_dict, } @@ -234,9 +238,10 @@ class ProcessExecutor(base.BaseExecutor): except BaseException as e: e.message += 'Task failed due to {0}.'.format(request['exception']) + \ UPDATE_TRACKED_CHANGES_FAILED_STR - self._task_failed(task, exception=e) + self._task_failed( + task, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info())) else: - self._task_failed(task, exception=request['exception']) + self._task_failed(task, exception=request['exception'], traceback=request['traceback']) def _handle_apply_tracked_changes_request(self, task_id, request, response): task = self._tasks[task_id] @@ -320,6 +325,7 @@ class _Messenger(object): 'type': type, 'task_id': self.task_id, 'exception': exceptions.wrap_if_needed(exception), + 'traceback': exceptions.get_exception_as_string(*sys.exc_info()), 'tracked_changes': tracked_changes }) response = _recv_message(sock) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 8b443cc..ad5e41a 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -20,7 +20,9 @@ Thread based executor import Queue import threading -from aria.utils import imports +import sys + +from aria.utils import imports, exceptions from .base import BaseExecutor from ....modeling.models import Parameter @@ -64,7 +66,9 @@ class ThreadExecutor(BaseExecutor): task_func(ctx=task.context, **inputs) self._task_succeeded(task) except BaseException as e: - self._task_failed(task, exception=e) + self._task_failed(task, + exception=e, + traceback=exceptions.get_exception_as_string(*sys.exc_info())) # Daemon threads except BaseException as e: pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/tests/.pylintrc ---------------------------------------------------------------------- diff --git a/tests/.pylintrc b/tests/.pylintrc index eead6e8..9795bfc 100644 --- a/tests/.pylintrc +++ b/tests/.pylintrc @@ -369,7 +369,7 @@ max-statements=50 max-parents=7 # Maximum number of attributes for a class (see R0902). -max-attributes=15 +max-attributes=25 # Minimum number of public methods for a class (see R0903). min-public-methods=0 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index ae1e83e..c05831a 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -12,3 +12,54 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import uuid +import logging +from collections import namedtuple +from contextlib import contextmanager + +from aria.modeling import models + + +class MockTask(object): + + INFINITE_RETRIES = models.Task.INFINITE_RETRIES + + def __init__(self, implementation, inputs=None, plugin=None): + self.implementation = self.name = implementation + self.plugin_fk = plugin.id if plugin else None + self.plugin = plugin or None + self.inputs = inputs or {} + self.states = [] + self.exception = None + self.id = str(uuid.uuid4()) + self.logger = logging.getLogger() + self.context = MockContext() + self.retry_count = 0 + self.max_attempts = 1 + self.ignore_failure = False + self.interface_name = 'interface_name' + self.operation_name = 'operation_name' + self.actor = namedtuple('actor', 'name')(name='actor_name') + self.model_task = None + + for state in models.Task.STATES: + setattr(self, state.upper(), state) + + @contextmanager + def _update(self): + yield self + + +class MockContext(object): + + def __init__(self): + self.logger = logging.getLogger('mock_logger') + self.task = type('SubprocessMockTask', (object, ), {'plugin': None}) + self.serialization_dict = {'context_cls': self.__class__, 'context': {}} + + def __getattr__(self, item): + return None + + @classmethod + def deserialize_from_dict(cls, **kwargs): + return cls() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index a7619de..d4482ae 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -13,9 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -import uuid -from contextlib import contextmanager import pytest import retrying @@ -37,14 +34,19 @@ from aria.orchestrator.workflows.executor import ( ) import tests +from . import MockTask + + +def _get_implementation(func): + return '{module}.{func.__name__}'.format(module=__name__, func=func) def test_execute(executor): expected_value = 'value' - successful_task = MockTask(mock_successful_task) - failing_task = MockTask(mock_failing_task) - task_with_inputs = MockTask(mock_task_with_input, inputs={'input': models.Parameter.wrap( - 'input', 'value')}) + successful_task = MockTask(_get_implementation(mock_successful_task)) + failing_task = MockTask(_get_implementation(mock_failing_task)) + task_with_inputs = MockTask(_get_implementation(mock_task_with_input), + inputs={'input': models.Parameter.wrap('input', 'value')}) for task in [successful_task, failing_task, task_with_inputs]: executor.execute(task) @@ -81,54 +83,6 @@ class MockException(Exception): pass -class MockContext(object): - - def __init__(self, *args, **kwargs): - self.logger = logging.getLogger() - self.task = type('SubprocessMockTask', (object, ), {'plugin': None}) - self.serialization_dict = {'context_cls': self.__class__, 'context': {}} - - def __getattr__(self, item): - return None - - @classmethod - def deserialize_from_dict(cls, **kwargs): - return cls() - - -class MockTask(object): - - INFINITE_RETRIES = models.Task.INFINITE_RETRIES - - def __init__(self, func, inputs=None): - self.states = [] - self.exception = None - self.id = str(uuid.uuid4()) - name = func.__name__ - implementation = '{module}.{name}'.format( - module=__name__, - name=name) - self.implementation = implementation - self.logger = logging.getLogger() - self.name = name - self.inputs = inputs or {} - self.context = MockContext() - self.retry_count = 0 - self.max_attempts = 1 - self.plugin_fk = None - self.ignore_failure = False - self.interface_name = 'interface_name' - self.operation_name = 'operation_name' - self.model_task = None - - for state in models.Task.STATES: - setattr(self, state.upper(), state) - - @contextmanager - def _update(self): - yield self - - @pytest.fixture(params=[ (thread.ThreadExecutor, {'pool_size': 1}), (thread.ThreadExecutor, {'pool_size': 2}), http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2f038276/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 839b9f1..b353518 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -15,13 +15,10 @@ import logging import os -import uuid import Queue -from contextlib import contextmanager import pytest -from aria.modeling import models as aria_models from aria.orchestrator import events from aria.utils.plugin import create as create_plugin from aria.orchestrator.workflows.executor import process @@ -33,17 +30,17 @@ from tests.fixtures import ( # pylint: disable=unused-import plugin_manager, fs_model as model ) +from . import MockTask class TestProcessExecutor(object): def test_plugin_execution(self, executor, mock_plugin): - task = MockTask(plugin=mock_plugin, - implementation='mock_plugin1.operation') + task = MockTask('mock_plugin1.operation', plugin=mock_plugin) queue = Queue.Queue() - def handler(_, exception=None): + def handler(_, exception=None, **kwargs): queue.put(exception) events.on_success_task_signal.connect(handler) @@ -100,31 +97,3 @@ class MockContext(object): @classmethod def deserialize_from_dict(cls, **kwargs): return cls() - - -class MockTask(object): - - INFINITE_RETRIES = aria_models.Task.INFINITE_RETRIES - - def __init__(self, plugin, implementation): - self.id = str(uuid.uuid4()) - self.implementation = implementation - self.logger = logging.getLogger() - self.name = implementation - self.inputs = {} - self.context = MockContext() - self.retry_count = 0 - self.max_attempts = 1 - self.plugin_fk = plugin.id - self.plugin = plugin - self.ignore_failure = False - self.interface_name = 'interface_name' - self.operation_name = 'operation_name' - self.model_task = None - - for state in aria_models.Task.STATES: - setattr(self, state.upper(), state) - - @contextmanager - def _update(self): - yield self