finalyzing
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4e3fb007 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4e3fb007 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4e3fb007 Branch: refs/heads/ARIA-106-Create-sqla-logging-handler Commit: 4e3fb00743f3f8cf6fe66e9a6d8378699458884a Parents: eba3e87 Author: mxmrlv <mxm...@gmail.com> Authored: Tue Feb 21 14:29:53 2017 +0200 Committer: mxmrlv <mxm...@gmail.com> Committed: Tue Feb 21 15:24:34 2017 +0200 ---------------------------------------------------------------------- aria/logger.py | 29 +++++++---- aria/orchestrator/context/common.py | 27 +++++------ aria/orchestrator/context/operation.py | 4 ++ aria/orchestrator/decorators.py | 16 +++--- .../orchestrator/execution_plugin/operations.py | 4 +- aria/orchestrator/workflows/executor/process.py | 7 ++- aria/orchestrator/workflows/executor/thread.py | 15 +++--- aria/storage/core.py | 4 +- aria/storage/modeling/orchestrator_elements.py | 2 +- tests/orchestrator/context/test_operation.py | 51 ++++++++++++-------- .../workflows/executor/test_executor.py | 6 ++- tests/test_logger.py | 3 +- 12 files changed, 99 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 9bb62d7..ae6d4a6 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -99,6 +99,10 @@ def create_console_log_handler(level=logging.DEBUG, formatter=None): return console +def create_sqla_log_handler(session, engine, level=logging.DEBUG): + return _SQLAlchemyHandler(session=session, engine=engine, level=level) + + class _DefaultConsoleFormat(logging.Formatter): """ _DefaultConsoleFormat class @@ -108,10 +112,11 @@ class _DefaultConsoleFormat(logging.Formatter): """ def format(self, record): try: - if record.levelno == logging.INFO: - self._fmt = '%(message)s' + if hasattr(record, 'prefix'): + self._fmt = '%(asctime)s: [%(levelname)s] @%(prefix)s ->%(message)s' else: - self._fmt = '%(levelname)s: %(message)s' + self._fmt = '%(asctime)s: [%(levelname)s] %(message)s' + except AttributeError: return record.message return logging.Formatter.format(self, record) @@ -137,24 +142,28 @@ def create_file_log_handler( return rotating_file -class SQLAlchemyHandler(logging.Handler): +class _SQLAlchemyHandler(logging.Handler): - def __init__(self, session, engine, log_cls, **kwargs): + def __init__(self, session, engine, **kwargs): logging.Handler.__init__(self, **kwargs) self._session = session self._engine = engine - self._cls = log_cls - def emit(self, record): - # CHECK: why is this needed? - self._cls.__table__.create(bind=self._engine, checkfirst=True) + # Cyclic dependency + from aria.storage.modeling.model import Log + self._cls = Log + def emit(self, record): + # pylint fails to recognize that this class does indeed have __table__ + self._cls.__table__.create(bind=self._engine, checkfirst=True) # pylint: disable=no-member + created_at = datetime.strptime(logging.Formatter('%(asctime)s').formatTime(record), + '%Y-%m-%d %H:%M:%S,%f') log = self._cls( prefix=record.prefix, logger=record.name, level=record.levelname, msg=record.msg, - created_at=datetime.utcnow(), + created_at=created_at, ) self._session.add(log) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 0c8c2f0..902598e 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -23,7 +23,7 @@ import logging import jinja2 from aria import logger as aria_logger -from aria.storage import exceptions, modeling +from aria.storage import exceptions class BaseContext(aria_logger.LoggerMixin): @@ -62,7 +62,7 @@ class BaseContext(aria_logger.LoggerMixin): def _get_sqla_handler(self): api_kwargs = self._model._initiator(**self._model._initiator_kwargs) api_kwargs.update(**self._model._api_kwargs) - return aria_logger.SQLAlchemyHandler(log_cls=modeling.model.Log, **api_kwargs) + return aria_logger.create_sqla_log_handler(**api_kwargs) def __repr__(self): return ( @@ -71,21 +71,18 @@ class BaseContext(aria_logger.LoggerMixin): .format(name=self.__class__.__name__, self=self)) @contextmanager - def self_logging(self, logger=None): - if not logger: - sqla_handler = self._get_sqla_handler() - console_handler = aria_logger.create_console_log_handler() - if sqla_handler not in self.logger.handlers: - # self.logger.addHandler(aria_logger.create_console_log_handler()) - self.logger.addHandler(sqla_handler) + def self_logging(self, handlers=None): + handlers = set(handlers) if handlers else set() + try: + handlers.add(aria_logger.create_console_log_handler()) + handlers.add(self._get_sqla_handler()) + for handler in handlers: + self.logger.addHandler(handler) self.logger = self.PrefixedLogger(self.logger, self.logging_id) yield self.logger - sqla_handler._session.close() - self.logger.removeHandler(sqla_handler) - self.logger.removeHandler(console_handler) - else: - self.logger = logger - yield self.logger + finally: + for handler in handlers: + self.logger.removeHandler(handler) @property def logging_id(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 766ebb5..97a09aa 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -52,6 +52,10 @@ class BaseOperationContext(BaseContext): return '{name}({0})'.format(details, name=self.name) @property + def logging_id(self): + raise NotImplementedError + + @property def task(self): """ The task in the model storage http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/aria/orchestrator/decorators.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py index 1cc97db..f915813 100644 --- a/aria/orchestrator/decorators.py +++ b/aria/orchestrator/decorators.py @@ -29,7 +29,7 @@ from .workflows.api import task_graph WORKFLOW_DECORATOR_RESERVED_ARGUMENTS = ('ctx', 'graph') -def workflow(func=None, suffix_template='', logger=None): +def workflow(func=None, suffix_template=''): """ Workflow decorator """ @@ -49,18 +49,21 @@ def workflow(func=None, suffix_template='', logger=None): workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name)) validate_function_arguments(func, workflow_parameters) with context.workflow.current.push(ctx): - with ctx.self_logging(logger): - func(**workflow_parameters) + func(**workflow_parameters) return workflow_parameters['graph'] return _wrapper -def operation(func=None, toolbelt=False, suffix_template='', logger=False): +def operation(func=None, toolbelt=False, suffix_template='', logging_handlers=None): """ Operation decorator """ + if func is None: - return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt) + return partial(operation, + suffix_template=suffix_template, + toolbelt=toolbelt, + logging_handlers=logging_handlers) @wraps(func) def _wrapper(**func_kwargs): @@ -68,7 +71,8 @@ def operation(func=None, toolbelt=False, suffix_template='', logger=False): operation_toolbelt = context.toolbelt(func_kwargs['ctx']) func_kwargs.setdefault('toolbelt', operation_toolbelt) validate_function_arguments(func, func_kwargs) - with func_kwargs['ctx'].self_logging(logger): + + with func_kwargs['ctx'].self_logging(handlers=logging_handlers): return func(**func_kwargs) return _wrapper http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/aria/orchestrator/execution_plugin/operations.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/operations.py b/aria/orchestrator/execution_plugin/operations.py index c86f697..5effa8a 100644 --- a/aria/orchestrator/execution_plugin/operations.py +++ b/aria/orchestrator/execution_plugin/operations.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from aria.orchestrator import operation from . import local as local_operations from .ssh import operations as ssh_operations @@ -50,7 +48,7 @@ def run_script_with_ssh(ctx, **kwargs) -@operation(logger=logging.getLogger()) +@operation def run_commands_with_ssh(ctx, commands, fabric_env=None, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 5f8a813..991fec6 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -200,7 +200,9 @@ class ProcessExecutor(base.BaseExecutor): request_handler = self._request_handlers.get(request_type) if not request_handler: raise RuntimeError('Invalid request type: {0}'.format(request_type)) - request_handler(task_id=request['task_id'], request=request, response=response) + task_id = request['task_id'] + # with self._tasks[task_id].self_logging(): + request_handler(task_id=task_id, request=request, response=response) except BaseException as e: self.logger.debug('Error in process executor listener: {0}'.format(e)) @@ -252,7 +254,8 @@ class ProcessExecutor(base.BaseExecutor): def _send_message(connection, message): - # Packing the length of the entire msg using struct.pack. This enables later reading of the content. + # Packing the length of the entire msg using struct.pack. + # This enables later reading of the content. def _pack(data): return struct.pack(_INT_FMT, len(data)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 359513e..6c59986 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -55,14 +55,13 @@ class ThreadExecutor(BaseExecutor): while not self._stopped: try: task = self._queue.get(timeout=1) - with task.self_logging(): - self._task_started(task) - try: - task_func = imports.load_attribute(task.implementation) - task_func(ctx=task.context, **task.inputs) - self._task_succeeded(task) - except BaseException as e: - self._task_failed(task, exception=e) + self._task_started(task) + try: + task_func = imports.load_attribute(task.implementation) + task_func(ctx=task.context, **task.inputs) + self._task_succeeded(task) + except BaseException as e: + self._task_failed(task, exception=e) # Daemon threads except BaseException as e: pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/aria/storage/core.py ---------------------------------------------------------------------- diff --git a/aria/storage/core.py b/aria/storage/core.py index eaf2bac..7d70070 100644 --- a/aria/storage/core.py +++ b/aria/storage/core.py @@ -141,7 +141,7 @@ class ModelStorage(Storage): kwargs['initiator'] = sql_mapi.init_storage super(ModelStorage, self).__init__(*args, **kwargs) - def register(self, model_cls, create_all=True): + def register(self, model_cls): """ Register the model into the model storage. :param model_cls: the model to register. @@ -155,7 +155,7 @@ class ModelStorage(Storage): self.registered[model_name] = self.api(name=model_name, model_cls=model_cls, **self.all_api_kwargs) - self.registered[model_name].create(create_all=create_all) + self.registered[model_name].create() self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self)) def drop(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/aria/storage/modeling/orchestrator_elements.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py index 366defc..47fe49f 100644 --- a/aria/storage/modeling/orchestrator_elements.py +++ b/aria/storage/modeling/orchestrator_elements.py @@ -479,5 +479,5 @@ class LogBase(ModelMixin): description = Column(String) def __repr__(self): - return "<[{self.level}] {self.created_at}@{self.prefix}> {msg}".format( + return "{self.created_at}: [{self.level}] @{self.prefix} ->{msg}".format( self=self, msg=self.msg[:50]) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index f6cc499..ccac9e3 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -15,7 +15,11 @@ import os +import logging +import tempfile + import pytest + from aria.orchestrator.workflows.executor import process, thread from aria import ( @@ -45,6 +49,17 @@ def ctx(tmpdir): storage.release_sqlite_storage(context.model) +tmp_file = os.path.join(tempfile.gettempdir(), 'op_testing_const_file_name') + + +@pytest.fixture(autouse=True) +def tmpfile_cleanup(): + try: + yield + finally: + if os.path.isfile(tmp_file): + os.remove(tmp_file) + @pytest.fixture def thread_executor(): result = thread.ThreadExecutor() @@ -229,17 +244,13 @@ def test_operation_logging(ctx, process_executor): node.interfaces = [interface] ctx.model.node.update(node) - wf_start = 'wf_start' - wf_end = 'wf_end' - inputs = { 'op_start': 'op_start', 'op_end': 'op_end', } @workflow - def basic_workflow(graph, ctx, **_): - ctx.logger.info(wf_start) + def basic_workflow(graph, **_): graph.add_tasks( api.task.OperationTask.node( name=operation_name, @@ -247,40 +258,40 @@ def test_operation_logging(ctx, process_executor): inputs=inputs ) ) - ctx.logger.info(wf_end) execute(workflow_func=basic_workflow, workflow_context=ctx, executor=process_executor) logs = ctx.model.log.list() - assert len(logs) == 4 + assert len(logs) == 2 op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info'] assert len(op_start_log) == 1 op_start_log = op_start_log[0] - op_end_log = [l for l in logs if inputs['op_end'] in l.msg and l.level.lower() == 'info'] + op_end_log = [l for l in logs if inputs['op_end'] in l.msg and l.level.lower() == 'debug'] assert len(op_end_log) == 1 op_end_log = op_end_log[0] - wf_start_log = [l for l in logs if wf_start in l.msg and l.level.lower() == 'info'] - assert len(wf_start_log) == 1 - wf_start_log = wf_start_log[0] + assert op_start_log.created_at < op_end_log.created_at - wf_end_log = [l for l in logs if wf_end in l.msg and l.level.lower() == 'info'] - assert len(wf_end_log) == 1 - wf_end_log = wf_end_log[0] + with open(tmp_file, 'r') as f: + logs = [l.strip() for l in f.readlines()] - assert (wf_start_log.created_at < - wf_end_log.created_at < - op_start_log.created_at < - op_end_log.created_at) + assert inputs['op_start'] in logs + assert inputs['op_end'] in logs -@operation +class MockLogHandler(logging.Handler): + def emit(self, record): + with open(tmp_file, 'a+') as f: + f.write(record.msg + '\n') + + +@operation(logging_handlers=set([MockLogHandler()])) def logged_operation(ctx, **_): ctx.logger.info(ctx.task.inputs['op_start']) - ctx.logger.info(ctx.task.inputs['op_end']) + ctx.logger.debug(ctx.task.inputs['op_end']) @operation http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 953efb2..64dfb66 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -83,7 +83,7 @@ class MockException(Exception): class MockContext(object): def __init__(self, *args, **kwargs): - pass + self.logger = logging.getLogger() def __getattr__(self, item): if item == 'serialization_dict': @@ -95,6 +95,10 @@ class MockContext(object): def deserialize_from_dict(cls, **kwargs): return cls() + @contextmanager + def self_logging(self): + yield self.logger + class MockTask(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e3fb007/tests/test_logger.py ---------------------------------------------------------------------- diff --git a/tests/test_logger.py b/tests/test_logger.py index 70f08bb..1ad055c 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -53,7 +53,8 @@ def test_create_console_log_handler(capsys): logger.info(info_test_string) logger.debug(debug_test_string) _, err = capsys.readouterr() - assert err.count('DEBUG: {test_string}'.format(test_string=debug_test_string)) == 1 + + assert err.count('[DEBUG] {test_string}'.format(test_string=debug_test_string)) assert err.count(info_test_string) == 1 # Custom handler