Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-138-Make-logging-more-informative ccdd32741 -> 29bc84be8 (forced update)
ARIA-138-Make-logging-more-informative Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/29bc84be Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/29bc84be Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/29bc84be Branch: refs/heads/ARIA-138-Make-logging-more-informative Commit: 29bc84be8cd0f05e1a741d39b6a1b8d3825fe108 Parents: f73c121 Author: max-orlov <ma...@gigaspaces.com> Authored: Wed Apr 19 17:14:15 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Thu Apr 20 15:30:03 2017 +0300 ---------------------------------------------------------------------- aria/cli/commands/executions.py | 16 +++- aria/cli/commands/logs.py | 21 ++---- aria/cli/execution_logging.py | 78 ++++++++++++++++++++ aria/cli/logger.py | 18 +++++ aria/logger.py | 8 +- aria/modeling/orchestration.py | 11 ++- aria/orchestrator/context/common.py | 47 ++++++------ aria/orchestrator/context/operation.py | 29 +------- aria/orchestrator/context/workflow.py | 4 +- aria/orchestrator/workflow_runner.py | 6 +- aria/orchestrator/workflows/events_logging.py | 35 ++++++--- aria/orchestrator/workflows/executor/base.py | 4 +- aria/orchestrator/workflows/executor/dry.py | 18 +++-- aria/orchestrator/workflows/executor/process.py | 15 ++-- aria/orchestrator/workflows/executor/thread.py | 8 +- aria/storage/core.py | 6 +- tests/.pylintrc | 2 +- .../orchestrator/workflows/executor/__init__.py | 51 +++++++++++++ .../workflows/executor/test_executor.py | 64 +++------------- .../workflows/executor/test_process_executor.py | 37 +--------- 20 files changed, 280 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index e100f0d..6a1f02a 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -18,6 +18,8 @@ import os from .. import helptexts from .. import table from .. import utils +from .. import logger as cli_logger +from .. import execution_logging from ..core import aria from ...modeling.models import Execution from ...orchestrator.workflow_runner import WorkflowRunner @@ -141,12 +143,19 @@ def start(workflow_name, logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) execution_thread.start() + + log_iterator = cli_logger.ModelLogIterator(model_storage, workflow_runner.execution_id) try: while execution_thread.is_alive(): - # using join without a timeout blocks and ignores KeyboardInterrupt + execution_logging.log_list(log_iterator) execution_thread.join(1) + except KeyboardInterrupt: - _cancel_execution(workflow_runner, execution_thread, logger) + _cancel_execution(workflow_runner, execution_thread, logger, log_iterator) + + # It might be the case where some logs were written and the execution was terminated, thus we + # need to drain the remaining logs. + execution_logging.log_list(log_iterator) # raise any errors from the execution thread (note these are not workflow execution errors) execution_thread.raise_error_if_exists() @@ -161,11 +170,12 @@ def start(workflow_name, model_storage.execution.delete(execution) -def _cancel_execution(workflow_runner, execution_thread, logger): +def _cancel_execution(workflow_runner, execution_thread, logger, log_iterator): logger.info('Cancelling execution. Press Ctrl+C again to force-cancel') try: workflow_runner.cancel() while execution_thread.is_alive(): + execution_logging.log_list(log_iterator) execution_thread.join(1) except KeyboardInterrupt: logger.info('Force-cancelling execution') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/cli/commands/logs.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/logs.py b/aria/cli/commands/logs.py index 6c83347..79aff07 100644 --- a/aria/cli/commands/logs.py +++ b/aria/cli/commands/logs.py @@ -12,13 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from .. import utils +from .. import execution_logging +from ..logger import ModelLogIterator from ..core import aria @aria.group(name='logs') -@aria.options.verbose() def logs(): """Show logs from workflow executions """ @@ -31,19 +30,15 @@ def logs(): @aria.options.verbose() @aria.pass_model_storage @aria.pass_logger -def list(execution_id, - model_storage, - logger): +def list(execution_id, model_storage, logger): """Display logs for an execution """ logger.info('Listing logs for execution id {0}'.format(execution_id)) - logs_list = model_storage.log.list(filters=dict(execution_fk=execution_id), - sort=utils.storage_sort_param('created_at', False)) - # TODO: print logs nicely - if logs_list: - for log in logs_list: - logger.info(log) - else: + log_iterator = ModelLogIterator(model_storage, execution_id) + + any_logs = execution_logging.log_list(log_iterator) + + if not any_logs: logger.info('\tNo logs') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/cli/execution_logging.py ---------------------------------------------------------------------- diff --git a/aria/cli/execution_logging.py b/aria/cli/execution_logging.py new file mode 100644 index 0000000..8baf6d7 --- /dev/null +++ b/aria/cli/execution_logging.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from StringIO import StringIO + +from . import logger +from .env import env + +DEFAULT_FORMATTING = { + logger.NO_VERBOSE: {'message': '{item.msg}'}, + logger.LOW_VERBOSE: { + 'message': '{timestamp} | {item.level[0]} | {item.msg}', + 'timestamp': '%H:%M:%S' + }, + logger.MEDIUM_VERBOSE: { + 'message': '{timestamp} | {item.level[0]} | {implementation} | {item.msg} ', + 'timestamp': '%H:%M:%S' + }, + logger.HIGH_VERBOSE: { + 'message': '{timestamp} | {item.level[0]} | {implementation}({inputs}) | {item.msg} ', + 'timestamp': '%H:%M:%S' + }, +} + + +def _str(item, formats=None): + # If no formats are passed we revert to the default formats (per level) + formats = formats or {} + formatting = formats.get(env.logging.verbosity_level, + DEFAULT_FORMATTING[env.logging.verbosity_level]) + msg = StringIO() + + formatting_kwargs = dict(item=item) + + if item.task: + formatting_kwargs['implementation'] = item.task.implementation + formatting_kwargs['inputs'] = dict(i.unwrap() for i in item.task.inputs.values()) + else: + formatting_kwargs['implementation'] = item.execution.workflow_name + formatting_kwargs['inputs'] = dict(i.unwrap() for i in item.execution.inputs.values()) + + if 'timestamp' in formatting: + formatting_kwargs['timestamp'] = item.created_at.strftime(formatting['timestamp']) + else: + formatting_kwargs['timestamp'] = item.created_at + + msg.write(formatting['message'].format(**formatting_kwargs)) + + # Add the exception and the error msg. + if item.traceback and env.logging.verbosity_level >= logger.MEDIUM_VERBOSE: + for line in item.traceback.splitlines(True): + msg.write('\t' + '|' + line) + + return msg.getvalue() + + +def log(item, *args, **kwargs): + return getattr(env.logging.logger, item.level.lower())(_str(item), *args, **kwargs) + + +def log_list(iterator): + any_logs = False + for item in iterator: + log(item) + any_logs = True + return any_logs http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/cli/logger.py ---------------------------------------------------------------------- diff --git a/aria/cli/logger.py b/aria/cli/logger.py index 1ffa918..c240f02 100644 --- a/aria/cli/logger.py +++ b/aria/cli/logger.py @@ -112,3 +112,21 @@ class Logging(object): log.setLevel(level) dictconfig.dictConfig(logger_dict) + + +class ModelLogIterator(object): + + def __init__(self, model_storage, execution_id, filters=None, sort=None): + self._last_visited_id = 0 + self._model_storage = model_storage + self._execution_id = execution_id + self._additional_filters = filters or {} + self._sort = sort or {} + + def __iter__(self): + filters = dict(execution_fk=self._execution_id, id=dict(gt=self._last_visited_id)) + filters.update(self._additional_filters) + + for log in self._model_storage.log.iter(filters=filters, sort=self._sort): + self._last_visited_id = log.id + yield log http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index dd54264..8e15f5b 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -51,6 +51,10 @@ class LoggerMixin(object): def __init__(self, *args, **kwargs): self.logger_name = self.logger_name or self.__class__.__name__ self.logger = logging.getLogger('{0}.{1}'.format(_base_logger.name, self.logger_name)) + # Set the logger handler of any object derived from LoggerMixing to NullHandler. + # This is since the absence of a handler shows up while using the CLI in the form of: + # `No handlers could be found for logger "..."`. + self.logger.addHandler(NullHandler()) self.logger.setLevel(self.logger_level) super(LoggerMixin, self).__init__(*args, **kwargs) @@ -177,10 +181,12 @@ class _SQLAlchemyHandler(logging.Handler): log = self._cls( execution_fk=self._execution_id, task_fk=record.task_id, - actor=record.prefix, level=record.levelname, msg=str(record.msg), created_at=created_at, + + # Not mandatory. + traceback=getattr(record, 'traceback', None) ) self._session.add(log) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 01ab2e8..b9a75e9 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -413,7 +413,9 @@ class LogBase(ModelMixin): level = Column(String) msg = Column(String) created_at = Column(DateTime, index=True) - actor = Column(String) + + # In case of failed execution + traceback = Column(Text) # region foreign keys @@ -427,6 +429,9 @@ class LogBase(ModelMixin): # endregion + def __str__(self): + return self.msg + def __repr__(self): - return "<{self.created_at}: [{self.level}] @{self.actor}> {msg}".format( - self=self, msg=self.msg[:50]) + name = (self.task.actor if self.task else self.execution).name + return '{name}: {self.msg}'.format(name=name, self=self) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 15843db..64ef9a4 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -38,43 +38,44 @@ class BaseContext(object): """ class PrefixedLogger(object): - def __init__(self, logger, prefix='', task_id=None): - self._logger = logger - self._prefix = prefix + def __init__(self, base_logger, task_id=None): + self._logger = base_logger self._task_id = task_id - def __getattr__(self, item): - if item.upper() in logging._levelNames: - return partial(getattr(self._logger, item), - extra={'prefix': self._prefix, 'task_id': self._task_id}) + def __getattr__(self, attribute): + if attribute.upper() in logging._levelNames: + return partial(self._logger_with_task_id, _level=attribute) else: - return getattr(self._logger, item) - - def __init__( - self, - name, - service_id, - execution_id, - model_storage, - resource_storage, - workdir=None, - **kwargs): + return getattr(self._logger, attribute) + + def _logger_with_task_id(self, *args, **kwargs): + level = kwargs.pop('_level') + kwargs.setdefault('extra', {})['task_id'] = self._task_id + return getattr(self._logger, level)(*args, **kwargs) + + def __init__(self, + name, + service_id, + model_storage, + resource_storage, + execution_id, + workdir=None, + **kwargs): super(BaseContext, self).__init__(**kwargs) self._name = name self._id = generate_uuid(variant='uuid') self._model = model_storage self._resource = resource_storage self._service_id = service_id - self._execution_id = execution_id self._workdir = workdir + self._execution_id = execution_id self.logger = None def _register_logger(self, level=None, task_id=None): self.logger = self.PrefixedLogger( - logging.getLogger(aria_logger.TASK_LOGGER_NAME), self.logging_id, task_id=task_id) + logging.getLogger(aria_logger.TASK_LOGGER_NAME), task_id=task_id) self.logger.setLevel(level or logging.DEBUG) if not self.logger.handlers: - self.logger.addHandler(aria_logger.create_console_log_handler()) self.logger.addHandler(self._get_sqla_handler()) def _get_sqla_handler(self): @@ -104,10 +105,6 @@ class BaseContext(object): self.logger.removeHandler(handler) @property - def logging_id(self): - raise NotImplementedError - - @property def model(self): """ Access to the model storage http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index c7d8246..c383958 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -29,20 +29,8 @@ class BaseOperationContext(BaseContext): Context object used during operation creation and execution """ - def __init__(self, - name, - model_storage, - resource_storage, - service_id, - task_id, - actor_id, - **kwargs): - super(BaseOperationContext, self).__init__( - name=name, - model_storage=model_storage, - resource_storage=resource_storage, - service_id=service_id, - **kwargs) + def __init__(self, task_id, actor_id, **kwargs): + super(BaseOperationContext, self).__init__(**kwargs) self._task_id = task_id self._actor_id = actor_id self._thread_local = threading.local() @@ -55,10 +43,6 @@ class BaseOperationContext(BaseContext): return '{name}({0})'.format(details, name=self.name) @property - def logging_id(self): - raise NotImplementedError - - @property def task(self): """ The task in the model storage @@ -119,10 +103,6 @@ class NodeOperationContext(BaseOperationContext): """ @property - def logging_id(self): - return self.node.name or self.node.id - - @property def node_template(self): """ the node of the current operation @@ -145,11 +125,6 @@ class RelationshipOperationContext(BaseOperationContext): """ @property - def logging_id(self): - return '{0}->{1}'.format(self.source_node.name or self.source_node.id, - self.target_node.name or self.target_node.id) - - @property def source_node_template(self): """ The source node http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 667d22f..920b237 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -50,8 +50,8 @@ class WorkflowContext(BaseContext): name=self.__class__.__name__, self=self)) @property - def logging_id(self): - return '{0}[{1}]'.format(self._workflow_name, self._execution_id) + def workflow_name(self): + return self._workflow_name @property def execution(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 1ea60a1..8f25cce 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -90,8 +90,12 @@ class WorkflowRunner(object): tasks_graph=self._tasks_graph) @property + def execution_id(self): + return self._execution_id + + @property def execution(self): - return self._model_storage.execution.get(self._execution_id) + return self._model_storage.execution.get(self.execution_id) @property def service(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index e831bfe..7d15c81 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -22,45 +22,56 @@ Implementation of logger handlers for workflow and operation events. """ from .. import events +from ... import modeling + + +def _get_task_name(task): + if isinstance(task.actor, modeling.model_bases.service_instance.RelationshipBase): + return '{source_node.name}->{target_node.name}'.format( + source_node=task.actor.source_node, target_node=task.actor.target_node) + else: + return task.actor.name @events.start_task_signal.connect def _start_task_handler(task, **kwargs): - task.context.logger.debug('Event: Starting task: {task.name}'.format(task=task)) + task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...' + .format(name=_get_task_name(task), task=task)) @events.on_success_task_signal.connect def _success_task_handler(task, **kwargs): - task.context.logger.debug('Event: Task success: {task.name}'.format(task=task)) + task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful' + .format(name=_get_task_name(task), task=task)) @events.on_failure_task_signal.connect -def _failure_operation_handler(task, exception, **kwargs): - error = '{0}: {1}'.format(type(exception).__name__, exception) - task.context.logger.error('Event: Task failure: {task.name} [{error}]'.format( - task=task, error=error)) - +def _failure_operation_handler(task, traceback, **kwargs): + task.context.logger.error( + '{name} {task.interface_name}.{task.operation_name} failed' + .format(name=_get_task_name(task), task=task), extra=dict(traceback=traceback) + ) @events.start_workflow_signal.connect def _start_workflow_handler(context, **kwargs): - context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context)) + context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context)) @events.on_failure_workflow_signal.connect def _failure_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context)) @events.on_success_workflow_signal.connect def _success_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow success: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context)) @events.on_cancelled_workflow_signal.connect def _cancel_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context)) @events.on_cancelling_workflow_signal.connect def _cancelling_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context)) + context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 4ae046d..39becef 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -44,8 +44,8 @@ class BaseExecutor(logger.LoggerMixin): events.start_task_signal.send(task) @staticmethod - def _task_failed(task, exception): - events.on_failure_task_signal.send(task, exception=exception) + def _task_failed(task, exception, traceback=None): + events.on_failure_task_signal.send(task, exception=exception, traceback=traceback) @staticmethod def _task_succeeded(task): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py index d894b25..e1261bb 100644 --- a/aria/orchestrator/workflows/executor/dry.py +++ b/aria/orchestrator/workflows/executor/dry.py @@ -34,15 +34,19 @@ class DryExecutor(BaseExecutor): task.started_at = datetime.utcnow() task.status = task.STARTED - actor_type = type(task.actor).__name__.lower() - implementation = '{0} > '.format(task.plugin) if task.plugin else '' - implementation += task.implementation - inputs = dict(inp.unwrap() for inp in task.inputs.values()) + if hasattr(task.actor, 'source_node'): + name = '{source_node.name}->{target_node.name}'.format( + source_node=task.actor.source_node, target_node=task.actor.target_node) + else: + name = task.actor.name task.context.logger.info( - 'Executing {actor_type} {task.actor.name} operation {task.interface_name} ' - '{task.operation_name}: {implementation} (Inputs: {inputs})' - .format(actor_type=actor_type, task=task, implementation=implementation, inputs=inputs)) + '<dry> {name} {task.interface_name}.{task.operation_name} started...' + .format(name=name, task=task)) + + task.context.logger.info( + '<dry> {name} {task.interface_name}.{task.operation_name} successful' + .format(name=name, task=task)) # updating the task manually instead of calling self._task_succeeded(task), # to avoid any side effects raising that event might cause http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 851d78e..2378e0a 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -42,13 +42,16 @@ import pickle import jsonpickle import aria -from aria.extension import process_executor -from aria.utils import imports -from aria.utils import exceptions from aria.orchestrator.workflows.executor import base from aria.storage import instrumentation +from aria.extension import process_executor +from aria.utils import ( + imports, + exceptions +) from aria.modeling import types as modeling_types + _IS_WIN = os.name == 'nt' _INT_FMT = 'I' @@ -233,9 +236,10 @@ class ProcessExecutor(base.BaseExecutor): except BaseException as e: e.message += 'Task failed due to {0}.'.format(request['exception']) + \ UPDATE_TRACKED_CHANGES_FAILED_STR - self._task_failed(task, exception=e) + self._task_failed( + task, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info())) else: - self._task_failed(task, exception=request['exception']) + self._task_failed(task, exception=request['exception'], traceback=request['traceback']) def _handle_apply_tracked_changes_request(self, task_id, request, response): task = self._tasks[task_id] @@ -319,6 +323,7 @@ class _Messenger(object): 'type': type, 'task_id': self.task_id, 'exception': exceptions.wrap_if_needed(exception), + 'traceback': exceptions.get_exception_as_string(*sys.exc_info()), 'tracked_changes': tracked_changes }) response = _recv_message(sock) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index f422592..836b2bf 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -20,7 +20,9 @@ Thread based executor import Queue import threading -from aria.utils import imports +import sys + +from aria.utils import imports, exceptions from .base import BaseExecutor @@ -63,7 +65,9 @@ class ThreadExecutor(BaseExecutor): task_func(ctx=task.context, **inputs) self._task_succeeded(task) except BaseException as e: - self._task_failed(task, exception=e) + self._task_failed(task, + exception=e, + traceback=exceptions.get_exception_as_string(*sys.exc_info())) # Daemon threads except BaseException as e: pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/aria/storage/core.py ---------------------------------------------------------------------- diff --git a/aria/storage/core.py b/aria/storage/core.py index 8caca66..8302fc9 100644 --- a/aria/storage/core.py +++ b/aria/storage/core.py @@ -38,7 +38,7 @@ API: * StorageDriver - class, abstract model implementation. """ -from aria.logger import LoggerMixin, NullHandler +from aria.logger import LoggerMixin from . import sql_mapi __all__ = ( @@ -71,10 +71,6 @@ class Storage(LoggerMixin): :param kwargs: """ super(Storage, self).__init__(**kwargs) - # Set the logger handler of any storage object to NullHandler. - # This is since the absence of a handler shows up while using the CLI in the form of: - # `No handlers could be found for logger "aria.ResourceStorage"`. - self.logger.addHandler(NullHandler()) self.api = api_cls self.registered = {} self._initiator = initiator http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/tests/.pylintrc ---------------------------------------------------------------------- diff --git a/tests/.pylintrc b/tests/.pylintrc index eead6e8..9795bfc 100644 --- a/tests/.pylintrc +++ b/tests/.pylintrc @@ -369,7 +369,7 @@ max-statements=50 max-parents=7 # Maximum number of attributes for a class (see R0902). -max-attributes=15 +max-attributes=25 # Minimum number of public methods for a class (see R0903). min-public-methods=0 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index ae1e83e..c05831a 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -12,3 +12,54 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import uuid +import logging +from collections import namedtuple +from contextlib import contextmanager + +from aria.modeling import models + + +class MockTask(object): + + INFINITE_RETRIES = models.Task.INFINITE_RETRIES + + def __init__(self, implementation, inputs=None, plugin=None): + self.implementation = self.name = implementation + self.plugin_fk = plugin.id if plugin else None + self.plugin = plugin or None + self.inputs = inputs or {} + self.states = [] + self.exception = None + self.id = str(uuid.uuid4()) + self.logger = logging.getLogger() + self.context = MockContext() + self.retry_count = 0 + self.max_attempts = 1 + self.ignore_failure = False + self.interface_name = 'interface_name' + self.operation_name = 'operation_name' + self.actor = namedtuple('actor', 'name')(name='actor_name') + self.model_task = None + + for state in models.Task.STATES: + setattr(self, state.upper(), state) + + @contextmanager + def _update(self): + yield self + + +class MockContext(object): + + def __init__(self): + self.logger = logging.getLogger('mock_logger') + self.task = type('SubprocessMockTask', (object, ), {'plugin': None}) + self.serialization_dict = {'context_cls': self.__class__, 'context': {}} + + def __getattr__(self, item): + return None + + @classmethod + def deserialize_from_dict(cls, **kwargs): + return cls() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index a7619de..d4482ae 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -13,9 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -import uuid -from contextlib import contextmanager import pytest import retrying @@ -37,14 +34,19 @@ from aria.orchestrator.workflows.executor import ( ) import tests +from . import MockTask + + +def _get_implementation(func): + return '{module}.{func.__name__}'.format(module=__name__, func=func) def test_execute(executor): expected_value = 'value' - successful_task = MockTask(mock_successful_task) - failing_task = MockTask(mock_failing_task) - task_with_inputs = MockTask(mock_task_with_input, inputs={'input': models.Parameter.wrap( - 'input', 'value')}) + successful_task = MockTask(_get_implementation(mock_successful_task)) + failing_task = MockTask(_get_implementation(mock_failing_task)) + task_with_inputs = MockTask(_get_implementation(mock_task_with_input), + inputs={'input': models.Parameter.wrap('input', 'value')}) for task in [successful_task, failing_task, task_with_inputs]: executor.execute(task) @@ -81,54 +83,6 @@ class MockException(Exception): pass -class MockContext(object): - - def __init__(self, *args, **kwargs): - self.logger = logging.getLogger() - self.task = type('SubprocessMockTask', (object, ), {'plugin': None}) - self.serialization_dict = {'context_cls': self.__class__, 'context': {}} - - def __getattr__(self, item): - return None - - @classmethod - def deserialize_from_dict(cls, **kwargs): - return cls() - - -class MockTask(object): - - INFINITE_RETRIES = models.Task.INFINITE_RETRIES - - def __init__(self, func, inputs=None): - self.states = [] - self.exception = None - self.id = str(uuid.uuid4()) - name = func.__name__ - implementation = '{module}.{name}'.format( - module=__name__, - name=name) - self.implementation = implementation - self.logger = logging.getLogger() - self.name = name - self.inputs = inputs or {} - self.context = MockContext() - self.retry_count = 0 - self.max_attempts = 1 - self.plugin_fk = None - self.ignore_failure = False - self.interface_name = 'interface_name' - self.operation_name = 'operation_name' - self.model_task = None - - for state in models.Task.STATES: - setattr(self, state.upper(), state) - - @contextmanager - def _update(self): - yield self - - @pytest.fixture(params=[ (thread.ThreadExecutor, {'pool_size': 1}), (thread.ThreadExecutor, {'pool_size': 2}), http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/29bc84be/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 839b9f1..b353518 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -15,13 +15,10 @@ import logging import os -import uuid import Queue -from contextlib import contextmanager import pytest -from aria.modeling import models as aria_models from aria.orchestrator import events from aria.utils.plugin import create as create_plugin from aria.orchestrator.workflows.executor import process @@ -33,17 +30,17 @@ from tests.fixtures import ( # pylint: disable=unused-import plugin_manager, fs_model as model ) +from . import MockTask class TestProcessExecutor(object): def test_plugin_execution(self, executor, mock_plugin): - task = MockTask(plugin=mock_plugin, - implementation='mock_plugin1.operation') + task = MockTask('mock_plugin1.operation', plugin=mock_plugin) queue = Queue.Queue() - def handler(_, exception=None): + def handler(_, exception=None, **kwargs): queue.put(exception) events.on_success_task_signal.connect(handler) @@ -100,31 +97,3 @@ class MockContext(object): @classmethod def deserialize_from_dict(cls, **kwargs): return cls() - - -class MockTask(object): - - INFINITE_RETRIES = aria_models.Task.INFINITE_RETRIES - - def __init__(self, plugin, implementation): - self.id = str(uuid.uuid4()) - self.implementation = implementation - self.logger = logging.getLogger() - self.name = implementation - self.inputs = {} - self.context = MockContext() - self.retry_count = 0 - self.max_attempts = 1 - self.plugin_fk = plugin.id - self.plugin = plugin - self.ignore_failure = False - self.interface_name = 'interface_name' - self.operation_name = 'operation_name' - self.model_task = None - - for state in aria_models.Task.STATES: - setattr(self, state.upper(), state) - - @contextmanager - def _update(self): - yield self