Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-138-Make-logging-more-informative 3cd7111e2 -> ba237ebad (forced update)
ARIA-138-Make-logging-more-informative Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ba237eba Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ba237eba Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ba237eba Branch: refs/heads/ARIA-138-Make-logging-more-informative Commit: ba237ebadc8e99c58bab41a373422f0e2832e42b Parents: 5b245b4 Author: max-orlov <ma...@gigaspaces.com> Authored: Thu Apr 6 11:54:44 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Wed Apr 12 18:42:04 2017 +0300 ---------------------------------------------------------------------- aria/cli/cli/aria.py | 2 +- aria/cli/commands/__init__.py | 1 + aria/cli/commands/executions.py | 12 +++- aria/cli/commands/logs.py | 24 ++++---- aria/cli/execution_logging.py | 58 ++++++++++++++++++++ aria/cli/logger.py | 18 ++++++ aria/logger.py | 4 +- aria/modeling/orchestration.py | 7 ++- aria/orchestrator/context/common.py | 37 ++++++------- aria/orchestrator/context/operation.py | 31 +---------- aria/orchestrator/context/workflow.py | 6 +- aria/orchestrator/workflow_runner.py | 8 ++- aria/orchestrator/workflows/core/engine.py | 2 + aria/orchestrator/workflows/events_logging.py | 26 +++++---- aria/orchestrator/workflows/executor/base.py | 4 +- aria/orchestrator/workflows/executor/process.py | 16 ++++-- aria/orchestrator/workflows/executor/thread.py | 8 ++- examples/hello-world/scripts/stop.sh | 3 + .../workflows/executor/test_executor.py | 2 + 19 files changed, 173 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/cli/cli/aria.py ---------------------------------------------------------------------- diff --git a/aria/cli/cli/aria.py b/aria/cli/cli/aria.py index 0177134..0843b4a 100644 --- a/aria/cli/cli/aria.py +++ b/aria/cli/cli/aria.py @@ -18,7 +18,7 @@ import sys import difflib import StringIO import traceback -from functools import wraps +from functools import wraps, partial import click http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/cli/commands/__init__.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/__init__.py b/aria/cli/commands/__init__.py index 7777791..9ca5e47 100644 --- a/aria/cli/commands/__init__.py +++ b/aria/cli/commands/__init__.py @@ -1,3 +1,4 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index 396985a..4bdc481 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from aria.cli import execution_logging from .. import utils +from .. import logger as cli_logger from ..table import print_data from ..cli import aria from ...modeling.models import Execution @@ -139,13 +141,19 @@ def start(workflow_name, logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) execution_thread.start() + + log_consumer = cli_logger.ModelLogConsumer(model_storage, workflow_runner.execution_id) try: while execution_thread.is_alive(): - # using join without a timeout blocks and ignores KeyboardInterrupt - execution_thread.join(1) + for log in log_consumer: + execution_logging.log(log) + except KeyboardInterrupt: _cancel_execution(workflow_runner, execution_thread, logger) + for log in log_consumer: + execution_logging.log(log) + # raise any errors from the execution thread (note these are not workflow execution errors) execution_thread.raise_error_if_exists() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/cli/commands/logs.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/logs.py b/aria/cli/commands/logs.py index f8873cd..4d5d4e2 100644 --- a/aria/cli/commands/logs.py +++ b/aria/cli/commands/logs.py @@ -12,13 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from .. import utils +from ..logger import ModelLogConsumer from ..cli import aria +from .. import execution_logging @aria.group(name='logs') -@aria.options.verbose() def logs(): """Show logs from workflow executions """ @@ -31,19 +30,18 @@ def logs(): @aria.options.verbose() @aria.pass_model_storage @aria.pass_logger -def list(execution_id, - model_storage, - logger): +def list(execution_id, model_storage, logger): """Display logs for an execution """ logger.info('Listing logs for execution id {0}'.format(execution_id)) - logs_list = model_storage.log.list(filters=dict(execution_fk=execution_id), - sort=utils.storage_sort_param('created_at', False)) - # TODO: print logs nicely - if logs_list: - for log in logs_list: - print log - else: + log_consumer = ModelLogConsumer(model_storage, execution_id) + any_logs = False + + for log in log_consumer: + execution_logging.log(log) + any_logs = True + + if not any_logs: logger.info('\tNo logs') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/cli/execution_logging.py ---------------------------------------------------------------------- diff --git a/aria/cli/execution_logging.py b/aria/cli/execution_logging.py new file mode 100644 index 0000000..9f83310 --- /dev/null +++ b/aria/cli/execution_logging.py @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from . import logger +from .env import env + + +def log(item): + + formats = { + logger.NO_VERBOSE: {'main_msg': '{item.msg}'}, + logger.LOW_VERBOSE: { + 'main_msg': '{created_at} | {item.level[0]} | {item.msg}', + 'created_at': '%H:%M:%S' + } + } + + # Only NO_VERBOSE and LOW_VERBOSE are configurable formats. configuring + # the low verbose level should affect any higher level. + formats = formats[min(env.logging.verbosity_level, logger.LOW_VERBOSE)] + + kwargs = dict(item=item) + if 'created_at' in formats: + kwargs['created_at'] = item.created_at.strftime(formats['created_at']) + if 'level' in formats: + kwargs['level'] = formats['level'].format(item.level) + if 'msg' in formats: + kwargs['msg'] = formats['msg'].format(item.msg) + + if 'actor' in formats and item.task: + kwargs['actor'] = formats['actor'].format(item.task.actor) + if 'execution' in formats: + kwargs['execution'] = formats['execution'].format(item.execution) + + # If no format was supplied just print out the original msg. + msg = formats.get('main_msg', '{item.msg}').format(**kwargs) + + # Add the exception and the error msg. + if item.traceback and env.logging.verbosity_level >= logger.MEDIUM_VERBOSE: + msg += os.linesep + '------>' + for line in item.traceback.splitlines(True): + msg += '\t' + '|' + line + + return getattr(env.logging.logger, item.level.lower())(msg) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/cli/logger.py ---------------------------------------------------------------------- diff --git a/aria/cli/logger.py b/aria/cli/logger.py index 289dbd3..4cf16b8 100644 --- a/aria/cli/logger.py +++ b/aria/cli/logger.py @@ -111,3 +111,21 @@ class Logging(object): self._all_loggers.append(logger_name) logging.config.dictConfig(logger_dict) + + +class ModelLogConsumer(object): + + def __init__(self, model_storage, execution_id, filters=None, sort=None): + self._last_visited_id = 0 + self._model_storage = model_storage + self._execution_id = execution_id + self._additional_filters = filters or {} + self._sort = sort or {} + + def __iter__(self): + filters = dict(execution_fk=self._execution_id, id=dict(gt=self._last_visited_id)) + filters.update(self._additional_filters) + + for log in self._model_storage.log.iter(filters=filters, sort=self._sort): + self._last_visited_id = log.id + yield log http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index bbb6c7a..61de132 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -168,10 +168,12 @@ class _SQLAlchemyHandler(logging.Handler): log = self._cls( execution_fk=self._execution_id, task_fk=record.task_id, - actor=record.prefix, level=record.levelname, msg=str(record.msg), created_at=created_at, + + # Not mandatory. + traceback=getattr(record, 'traceback', None) ) self._session.add(log) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 12a56f6..ca67628 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -393,7 +393,9 @@ class LogBase(ModelMixin): level = Column(String) msg = Column(String) created_at = Column(DateTime, index=True) - actor = Column(String) + + # In case of failed execution + traceback = Column(Text) # region foreign keys @@ -408,5 +410,4 @@ class LogBase(ModelMixin): # endregion def __repr__(self): - return "<{self.created_at}: [{self.level}] @{self.actor}> {msg}".format( - self=self, msg=self.msg[:50]) + return "{msg}".format(msg=self.msg) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 11b5eb9..6dd2e72 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -39,26 +39,29 @@ class BaseContext(object): """ class PrefixedLogger(object): - def __init__(self, logger, prefix='', task_id=None): - self._logger = logger - self._prefix = prefix + def __init__(self, base_logger, task_id=None): + self._logger = base_logger self._task_id = task_id def __getattr__(self, item): if item.upper() in logging._levelNames: - return partial(getattr(self._logger, item), - extra={'prefix': self._prefix, 'task_id': self._task_id}) + return partial(self._logger_with_task_id, _level=item) else: return getattr(self._logger, item) - def __init__( - self, - name, - service_id, - model_storage, - resource_storage, - workdir=None, - **kwargs): + def _logger_with_task_id(self, *args, **kwargs): + level = kwargs.pop('_level') + kwargs.setdefault('extra', {})['task_id'] = self._task_id + return getattr(self._logger, level)(*args, **kwargs) + + def __init__(self, + name, + service_id, + model_storage, + resource_storage, + execution_id, + workdir=None, + **kwargs): super(BaseContext, self).__init__(**kwargs) self._name = name self._id = generate_uuid(variant='uuid') @@ -66,14 +69,14 @@ class BaseContext(object): self._resource = resource_storage self._service_id = service_id self._workdir = workdir + self._execution_id = execution_id self.logger = None def _register_logger(self, level=None, task_id=None): self.logger = self.PrefixedLogger( - logging.getLogger(logger.TASK_LOGGER_NAME), self.logging_id, task_id=task_id) + logging.getLogger(logger.TASK_LOGGER_NAME), task_id=task_id) self.logger.setLevel(level or logging.DEBUG) if not self.logger.handlers: - self.logger.addHandler(aria_logger.create_console_log_handler()) self.logger.addHandler(self._get_sqla_handler()) def _get_sqla_handler(self): @@ -103,10 +106,6 @@ class BaseContext(object): self.logger.removeHandler(handler) @property - def logging_id(self): - raise NotImplementedError - - @property def model(self): """ Access to the model storage http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index cbd186c..c383958 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -29,25 +29,11 @@ class BaseOperationContext(BaseContext): Context object used during operation creation and execution """ - def __init__(self, - name, - model_storage, - resource_storage, - service_id, - task_id, - actor_id, - execution_id, - **kwargs): - super(BaseOperationContext, self).__init__( - name=name, - model_storage=model_storage, - resource_storage=resource_storage, - service_id=service_id, - **kwargs) + def __init__(self, task_id, actor_id, **kwargs): + super(BaseOperationContext, self).__init__(**kwargs) self._task_id = task_id self._actor_id = actor_id self._thread_local = threading.local() - self._execution_id = execution_id self._register_logger(task_id=self.task.id) def __repr__(self): @@ -57,10 +43,6 @@ class BaseOperationContext(BaseContext): return '{name}({0})'.format(details, name=self.name) @property - def logging_id(self): - raise NotImplementedError - - @property def task(self): """ The task in the model storage @@ -121,10 +103,6 @@ class NodeOperationContext(BaseOperationContext): """ @property - def logging_id(self): - return self.node.name or self.node.id - - @property def node_template(self): """ the node of the current operation @@ -147,11 +125,6 @@ class RelationshipOperationContext(BaseOperationContext): """ @property - def logging_id(self): - return '{0}->{1}'.format(self.source_node.name or self.source_node.id, - self.target_node.name or self.target_node.id) - - @property def source_node_template(self): """ The source node http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index ad4a2ff..b17c9ef 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -30,7 +30,6 @@ class WorkflowContext(BaseContext): """ def __init__(self, workflow_name, - execution_id, parameters=None, task_max_attempts=1, task_retry_interval=0, @@ -42,7 +41,6 @@ class WorkflowContext(BaseContext): self._task_max_attempts = task_max_attempts self._task_retry_interval = task_retry_interval self._task_ignore_failure = task_ignore_failure - self._execution_id = execution_id self._register_logger() def __repr__(self): @@ -52,8 +50,8 @@ class WorkflowContext(BaseContext): name=self.__class__.__name__, self=self)) @property - def logging_id(self): - return '{0}[{1}]'.format(self._workflow_name, self._execution_id) + def workflow_name(self): + return self._workflow_name @property def execution(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 1cdf1de..c7d2c30 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -92,8 +92,12 @@ class WorkflowRunner(object): tasks_graph=self._tasks_graph) @property + def execution_id(self): + return self._execution_id + + @property def execution(self): - return self._model_storage.execution.get(self._execution_id) + return self._model_storage.execution.get(self.execution_id) @property def service(self): @@ -121,7 +125,7 @@ class WorkflowRunner(object): execution.inputs = modeling_utils.create_inputs(inputs, workflow_inputs) # TODO: these two following calls should execute atomically - self._validate_no_active_executions(execution) + # self._validate_no_active_executions(execution) self._model_storage.execution.put(execution) return execution http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 0503142..5c89f68 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -20,6 +20,7 @@ The workflow engine. Executes workflows import time from datetime import datetime +import logging import networkx from aria import logger @@ -40,6 +41,7 @@ class Engine(logger.LoggerMixin): def __init__(self, executor, workflow_context, tasks_graph, **kwargs): super(Engine, self).__init__(**kwargs) + self.logger.addHandler(logging.NullHandler()) self._workflow_context = workflow_context self._execution_graph = networkx.DiGraph() self._executor = executor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index e831bfe..1bc5a1e 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -26,41 +26,43 @@ from .. import events @events.start_task_signal.connect def _start_task_handler(task, **kwargs): - task.context.logger.debug('Event: Starting task: {task.name}'.format(task=task)) + task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} started...' + .format(actor=task.actor, task=task)) @events.on_success_task_signal.connect def _success_task_handler(task, **kwargs): - task.context.logger.debug('Event: Task success: {task.name}'.format(task=task)) + task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} successful' + .format(actor=task.actor, task=task)) @events.on_failure_task_signal.connect -def _failure_operation_handler(task, exception, **kwargs): - error = '{0}: {1}'.format(type(exception).__name__, exception) - task.context.logger.error('Event: Task failure: {task.name} [{error}]'.format( - task=task, error=error)) - +def _failure_operation_handler(task, traceback, **kwargs): + task.context.logger.error( + '{actor.name} {task.interface_name}.{task.operation_name} failed' + .format(actor=task.actor, task=task), extra=dict(traceback=traceback) + ) @events.start_workflow_signal.connect def _start_workflow_handler(context, **kwargs): - context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context)) + context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context)) @events.on_failure_workflow_signal.connect def _failure_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context)) @events.on_success_workflow_signal.connect def _success_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow success: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context)) @events.on_cancelled_workflow_signal.connect def _cancel_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context)) @events.on_cancelling_workflow_signal.connect def _cancelling_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context)) + context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 4ae046d..39becef 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -44,8 +44,8 @@ class BaseExecutor(logger.LoggerMixin): events.start_task_signal.send(task) @staticmethod - def _task_failed(task, exception): - events.on_failure_task_signal.send(task, exception=exception) + def _task_failed(task, exception, traceback=None): + events.on_failure_task_signal.send(task, exception=exception, traceback=traceback) @staticmethod def _task_succeeded(task): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index dc369ab..a5d3cce 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -25,6 +25,8 @@ import sys # As part of the process executor implementation, subprocess are started with this module as their # entry point. We thus remove this module's directory from the python path if it happens to be # there +from aria.modeling.models import Parameter + script_dir = os.path.dirname(__file__) if script_dir in sys.path: sys.path.remove(script_dir) @@ -48,7 +50,6 @@ from aria.utils import exceptions from aria.orchestrator.workflows.executor import base from aria.storage import instrumentation from aria.modeling import types as modeling_types -from aria.modeling.models import Parameter _IS_WIN = os.name == 'nt' @@ -148,8 +149,10 @@ class ProcessExecutor(base.BaseExecutor): def _create_arguments_dict(self, task): return { 'task_id': task.id, - 'implementation': task.implementation, - 'operation_inputs': Parameter.unwrap_dict(task.inputs), + # 'implementation': task.implementation, + 'implementation': 'aria.orchestrator.execution_plugin.operations.run_script_locally', + # 'operation_inputs': Parameter.unwrap_dict(task.inputs), + 'operation_inputs': dict(script_path=task.implementation), 'port': self._server_port, 'context': task.context.serialization_dict, } @@ -234,9 +237,10 @@ class ProcessExecutor(base.BaseExecutor): except BaseException as e: e.message += 'Task failed due to {0}.'.format(request['exception']) + \ UPDATE_TRACKED_CHANGES_FAILED_STR - self._task_failed(task, exception=e) + self._task_failed( + task, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info())) else: - self._task_failed(task, exception=request['exception']) + self._task_failed(task, exception=request['exception'], traceback=request['traceback']) def _handle_apply_tracked_changes_request(self, task_id, request, response): task = self._tasks[task_id] @@ -320,6 +324,7 @@ class _Messenger(object): 'type': type, 'task_id': self.task_id, 'exception': exceptions.wrap_if_needed(exception), + 'traceback': exceptions.get_exception_as_string(*sys.exc_info()), 'tracked_changes': tracked_changes }) response = _recv_message(sock) @@ -383,7 +388,6 @@ def _main(): # This is required for the instrumentation work properly. # See docstring of `remove_mutable_association_listener` for further details modeling_types.remove_mutable_association_listener() - with instrumentation.track_changes() as instrument: try: ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 8b443cc..ad5e41a 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -20,7 +20,9 @@ Thread based executor import Queue import threading -from aria.utils import imports +import sys + +from aria.utils import imports, exceptions from .base import BaseExecutor from ....modeling.models import Parameter @@ -64,7 +66,9 @@ class ThreadExecutor(BaseExecutor): task_func(ctx=task.context, **inputs) self._task_succeeded(task) except BaseException as e: - self._task_failed(task, exception=e) + self._task_failed(task, + exception=e, + traceback=exceptions.get_exception_as_string(*sys.exc_info())) # Daemon threads except BaseException as e: pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/examples/hello-world/scripts/stop.sh ---------------------------------------------------------------------- diff --git a/examples/hello-world/scripts/stop.sh b/examples/hello-world/scripts/stop.sh index 5461caf..400726e 100755 --- a/examples/hello-world/scripts/stop.sh +++ b/examples/hello-world/scripts/stop.sh @@ -1,5 +1,8 @@ #!/bin/bash + +bla bla bla + set -e TEMP_DIR="/tmp" http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ba237eba/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index d84d1ec..e39a993 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -15,6 +15,7 @@ import logging import uuid +from collections import namedtuple from contextlib import contextmanager import pytest @@ -119,6 +120,7 @@ class MockTask(object): self.ignore_failure = False self.interface_name = 'interface_name' self.operation_name = 'operation_name' + self.actor = namedtuple('actor', 'name')(name='actor_name') for state in models.Task.STATES: setattr(self, state.upper(), state)