Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue 0942b2e2e -> 5194ad2ac (forced update)
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5194ad2a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5194ad2a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5194ad2a Branch: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue Commit: 5194ad2acad91b719fbde2cff575a8704a844e05 Parents: 16fcca4 Author: max-orlov <ma...@gigaspaces.com> Authored: Tue May 9 17:24:31 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Tue May 9 17:25:55 2017 +0300 ---------------------------------------------------------------------- aria/logger.py | 22 ++++++---------------- aria/orchestrator/context/common.py | 10 +++------- aria/storage/instrumentation.py | 21 ++++++++++++++++++--- tests/orchestrator/context/test_operation.py | 2 +- 4 files changed, 28 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5194ad2a/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 97d3878..9214bd9 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -114,14 +114,13 @@ def create_console_log_handler(level=logging.DEBUG, formatter=None): return console -def create_sqla_log_handler(session, engine, log_cls, execution_id, level=logging.DEBUG): +def create_sqla_log_handler(model, log_cls, execution_id, level=logging.DEBUG): # This is needed since the engine and session are entirely new we need to reflect the db # schema of the logging model into the engine and session. - log_cls.__table__.create(bind=engine, checkfirst=True) + log_cls.__table__.create(bind=model.log._engine, checkfirst=True) - return _SQLAlchemyHandler(session=session, - engine=engine, + return _SQLAlchemyHandler(model=model, log_cls=log_cls, execution_id=execution_id, level=level) @@ -168,10 +167,9 @@ def create_file_log_handler( class _SQLAlchemyHandler(logging.Handler): - def __init__(self, session, engine, log_cls, execution_id, **kwargs): + def __init__(self, model, log_cls, execution_id, **kwargs): logging.Handler.__init__(self, **kwargs) - self._session = session - self._engine = engine + self._model = model self._cls = log_cls self._execution_id = execution_id @@ -188,15 +186,7 @@ class _SQLAlchemyHandler(logging.Handler): # Not mandatory. traceback=getattr(record, 'traceback', None) ) - self._session.add(log) - - try: - self._session.commit() - except BaseException: - self._session.rollback() - raise - finally: - self._session.close() + self._model.log.put(log) _default_file_formatter = logging.Formatter( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5194ad2a/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 64ef9a4..c0047e9 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -79,13 +79,9 @@ class BaseContext(object): self.logger.addHandler(self._get_sqla_handler()) def _get_sqla_handler(self): - api_kwargs = {} - if self._model._initiator: - api_kwargs.update(self._model._initiator(**self._model._initiator_kwargs)) - api_kwargs.update(**self._model._api_kwargs) - return aria_logger.create_sqla_log_handler(log_cls=modeling.models.Log, - execution_id=self._execution_id, - **api_kwargs) + return aria_logger.create_sqla_log_handler(model=self._model, + log_cls=modeling.models.Log, + execution_id=self._execution_id) def __repr__(self): return ( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5194ad2a/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index cf2a365..14d4423 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -26,7 +26,15 @@ from ..storage.exceptions import StorageError _VERSION_ID_COL = 'version' _STUB = object() _INSTRUMENTED = { - _models.Node.runtime_properties: dict + _models.Node.runtime_properties: dict, + + # Log related stuff + _models.Log.level: str, + _models.Log.msg: str, + _models.Log.traceback: str, + _models.Log.created_at: lambda date: date, + _models.Log.execution_fk: int, + _models.Log.task_fk: int, } @@ -178,11 +186,18 @@ def apply_tracked_changes(tracked_changes, model): for attribute_name, value in tracked_attributes.items(): if value.initial != value.current: if not instance: - instance = mapi.get(instance_id) + # The object can be entirely new (Log is an example of this use case, + # its id is None (or 'null'), thus we need to create it from scratch, + # and not just update it. + instance = mapi.model_cls() if 'null' else mapi.get(instance_id) setattr(instance, attribute_name, value.current) if instance: _validate_version_id(instance, mapi) - mapi.update(instance) + # This follows the same logic as the same comment regarding 'null' + if instance_id == 'null': + mapi.put(instance) + else: + mapi.update(instance) successfully_updated_changes[mapi_name][instance_id] = [ v.dict for v in tracked_attributes.values()] except BaseException: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5194ad2a/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index cdeb5fa..757a375 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -263,7 +263,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): @pytest.fixture(params=[ - (thread.ThreadExecutor, {}), + # (thread.ThreadExecutor, {}), (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), ]) def executor(request):