ARIA-213 Sporadic tests failures over locked database issue Move from 2 different sessions - one for the log, and the other for general model operations, to one single session, while utilizing the keep tracking of changes mechanism for both logs and node/task states.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/2ee06b8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/2ee06b8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/2ee06b8a Branch: refs/heads/ARIA-208-Missing-back-refrences-for-models Commit: 2ee06b8a6abe79f429458c7dbc5f9e1c31aec589 Parents: 6864d42 Author: max-orlov <ma...@gigaspaces.com> Authored: Tue May 9 17:24:31 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Thu May 11 17:24:15 2017 +0300 ---------------------------------------------------------------------- aria/logger.py | 25 +---- aria/orchestrator/context/common.py | 13 +-- .../workflows/core/events_handler.py | 4 +- aria/orchestrator/workflows/executor/process.py | 51 ++++++---- aria/storage/instrumentation.py | 97 +++++++++++++++++--- .../orchestrator/workflows/executor/__init__.py | 21 ++++- .../workflows/executor/test_executor.py | 36 ++++++-- .../workflows/executor/test_process_executor.py | 13 ++- tests/storage/test_instrumentation.py | 37 +++++++- 9 files changed, 219 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 97d3878..bd7ed4e 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -114,17 +114,11 @@ def create_console_log_handler(level=logging.DEBUG, formatter=None): return console -def create_sqla_log_handler(session, engine, log_cls, execution_id, level=logging.DEBUG): +def create_sqla_log_handler(model, log_cls, execution_id, level=logging.DEBUG): # This is needed since the engine and session are entirely new we need to reflect the db # schema of the logging model into the engine and session. - log_cls.__table__.create(bind=engine, checkfirst=True) - - return _SQLAlchemyHandler(session=session, - engine=engine, - log_cls=log_cls, - execution_id=execution_id, - level=level) + return _SQLAlchemyHandler(model=model, log_cls=log_cls, execution_id=execution_id, level=level) class _DefaultConsoleFormat(logging.Formatter): @@ -168,10 +162,9 @@ def create_file_log_handler( class _SQLAlchemyHandler(logging.Handler): - def __init__(self, session, engine, log_cls, execution_id, **kwargs): + def __init__(self, model, log_cls, execution_id, **kwargs): logging.Handler.__init__(self, **kwargs) - self._session = session - self._engine = engine + self._model = model self._cls = log_cls self._execution_id = execution_id @@ -188,15 +181,7 @@ class _SQLAlchemyHandler(logging.Handler): # Not mandatory. traceback=getattr(record, 'traceback', None) ) - self._session.add(log) - - try: - self._session.commit() - except BaseException: - self._session.rollback() - raise - finally: - self._session.close() + self._model.log.put(log) _default_file_formatter = logging.Formatter( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 64ef9a4..0854a27 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -79,13 +79,9 @@ class BaseContext(object): self.logger.addHandler(self._get_sqla_handler()) def _get_sqla_handler(self): - api_kwargs = {} - if self._model._initiator: - api_kwargs.update(self._model._initiator(**self._model._initiator_kwargs)) - api_kwargs.update(**self._model._api_kwargs) - return aria_logger.create_sqla_log_handler(log_cls=modeling.models.Log, - execution_id=self._execution_id, - **api_kwargs) + return aria_logger.create_sqla_log_handler(model=self._model, + log_cls=modeling.models.Log, + execution_id=self._execution_id) def __repr__(self): return ( @@ -196,7 +192,6 @@ class BaseContext(object): def _render_resource(self, resource_content, variables): variables = variables or {} - if 'ctx' not in variables: - variables['ctx'] = self + variables.setdefault('ctx', self) resource_template = jinja2.Template(resource_content) return resource_template.render(variables) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index f3e4e7e..669fb43 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -40,7 +40,7 @@ def _task_started(task, *args, **kwargs): with task._update(): task.started_at = datetime.utcnow() task.status = task.STARTED - _update_node_state_if_necessary(task, is_transitional=True) + _update_node_state_if_necessary(task, is_transitional=True) @events.on_failure_task_signal.connect @@ -74,7 +74,7 @@ def _task_succeeded(task, *args, **kwargs): task.ended_at = datetime.utcnow() task.status = task.SUCCESS - _update_node_state_if_necessary(task) + _update_node_state_if_necessary(task) @events.start_workflow_signal.connect http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index e464f7d..824c4e1 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -229,6 +229,7 @@ class ProcessExecutor(base.BaseExecutor): def _apply_tracked_changes(task, request): instrumentation.apply_tracked_changes( tracked_changes=request['tracked_changes'], + new_instances=request['new_instances'], model=task.context.model) @@ -277,22 +278,28 @@ class _Messenger(object): """Task started message""" self._send_message(type='started') - def succeeded(self, tracked_changes): + def succeeded(self, tracked_changes, new_instances): """Task succeeded message""" - self._send_message(type='succeeded', tracked_changes=tracked_changes) + self._send_message( + type='succeeded', tracked_changes=tracked_changes, new_instances=new_instances) - def failed(self, tracked_changes, exception): + def failed(self, tracked_changes, new_instances, exception): """Task failed message""" - self._send_message(type='failed', tracked_changes=tracked_changes, exception=exception) + self._send_message(type='failed', + tracked_changes=tracked_changes, + new_instances=new_instances, + exception=exception) - def apply_tracked_changes(self, tracked_changes): - self._send_message(type='apply_tracked_changes', tracked_changes=tracked_changes) + def apply_tracked_changes(self, tracked_changes, new_instances): + self._send_message(type='apply_tracked_changes', + tracked_changes=tracked_changes, + new_instances=new_instances) def closed(self): """Executor closed message""" self._send_message(type='closed') - def _send_message(self, type, tracked_changes=None, exception=None): + def _send_message(self, type, tracked_changes=None, new_instances=None, exception=None): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(('localhost', self.port)) try: @@ -301,7 +308,8 @@ class _Messenger(object): 'task_id': self.task_id, 'exception': exceptions.wrap_if_needed(exception), 'traceback': exceptions.get_exception_as_string(*sys.exc_info()), - 'tracked_changes': tracked_changes + 'tracked_changes': tracked_changes or {}, + 'new_instances': new_instances or {} }) response = _recv_message(sock) response_exception = response.get('exception') @@ -311,7 +319,7 @@ class _Messenger(object): sock.close() -def _patch_session(ctx, messenger, instrument): +def _patch_ctx(ctx, messenger, instrument): # model will be None only in tests that test the executor component directly if not ctx.model: return @@ -326,12 +334,13 @@ def _patch_session(ctx, messenger, instrument): original_refresh(target) def patched_commit(): - messenger.apply_tracked_changes(instrument.tracked_changes) + messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances) + instrument.expunge_session() instrument.clear() def patched_rollback(): # Rollback is performed on parent process when commit fails - pass + instrument.expunge_session() # when autoflush is set to true (the default), refreshing an object will trigger # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so @@ -363,21 +372,29 @@ def _main(): # This is required for the instrumentation work properly. # See docstring of `remove_mutable_association_listener` for further details modeling_types.remove_mutable_association_listener() + try: + ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) + except BaseException as e: + messenger.failed(exception=e, tracked_changes=None, new_instances=None) + return - with instrumentation.track_changes() as instrument: + with instrumentation.track_changes(ctx.model) as instrument: try: messenger.started() - ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) - _patch_session(ctx=ctx, messenger=messenger, instrument=instrument) + _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument) task_func = imports.load_attribute(implementation) aria.install_aria_extensions() for decorate in process_executor.decorate(): task_func = decorate(task_func) task_func(ctx=ctx, **operation_inputs) - messenger.succeeded(tracked_changes=instrument.tracked_changes) + messenger.succeeded(tracked_changes=instrument.tracked_changes, + new_instances=instrument.new_instances) except BaseException as e: - messenger.failed(exception=e, tracked_changes=instrument.tracked_changes) - + messenger.failed(exception=e, + tracked_changes=instrument.tracked_changes, + new_instances=instrument.new_instances) + finally: + instrument.expunge_session() if __name__ == '__main__': _main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index cf2a365..390f933 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -13,9 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import copy import json +import os import sqlalchemy.event @@ -26,11 +26,19 @@ from ..storage.exceptions import StorageError _VERSION_ID_COL = 'version' _STUB = object() _INSTRUMENTED = { - _models.Node.runtime_properties: dict + 'modified': { + _models.Node.runtime_properties: dict, + _models.Node.state: str, + _models.Task.status: str, + }, + 'new': (_models.Log, ) + } +_NEW_INSTANCE = 'NEW_INSTANCE' + -def track_changes(instrumented=None): +def track_changes(model=None, instrumented=None): """Track changes in the specified model columns This call will register event listeners using sqlalchemy's event mechanism. The listeners @@ -50,32 +58,78 @@ def track_changes(instrumented=None): will then call ``apply_tracked_changes()`` that resides in this module as well. At that point, the changes will actually be written back to the database. + :param model: the model storage. it should hold a mapi for each model. the session of each mapi + is needed to setup events :param instrumented: A dict from model columns to their python native type :return: The instrumentation context """ - return _Instrumentation(instrumented or _INSTRUMENTED) + return _Instrumentation(model, instrumented or _INSTRUMENTED) class _Instrumentation(object): - def __init__(self, instrumented): + def __init__(self, model, instrumented): self.tracked_changes = {} + self.new_instances = {} self.listeners = [] + self._instances_to_expunge = [] + self._model = model self._track_changes(instrumented) + @property + def _new_instance_id(self): + return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, + index=len(self._instances_to_expunge)) + + def expunge_session(self): + for new_instance in self._instances_to_expunge: + self._get_session_from_model(new_instance.__tablename__).expunge(new_instance) + + def _get_session_from_model(self, tablename): + mapi = getattr(self._model, tablename, None) + if mapi: + return mapi._session + raise StorageError("Could not retrieve session for {0}".format(tablename)) + def _track_changes(self, instrumented): - instrumented_classes = {} - for instrumented_attribute, attribute_type in instrumented.items(): + instrumented_attribute_classes = {} + # Track any newly-set attributes. + for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items(): self._register_set_attribute_listener( instrumented_attribute=instrumented_attribute, attribute_type=attribute_type) instrumented_class = instrumented_attribute.parent.entity - instrumented_class_attributes = instrumented_classes.setdefault(instrumented_class, {}) + instrumented_class_attributes = instrumented_attribute_classes.setdefault( + instrumented_class, {}) instrumented_class_attributes[instrumented_attribute.key] = attribute_type - for instrumented_class, instrumented_attributes in instrumented_classes.items(): - self._register_instance_listeners( - instrumented_class=instrumented_class, - instrumented_attributes=instrumented_attributes) + + # Track any global instance update such as 'refresh' or 'load' + for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items(): + self._register_instance_listeners(instrumented_class=instrumented_class, + instrumented_attributes=instrumented_attributes) + + # Track any newly created instances. + for instrumented_class in instrumented.get('new', {}): + self._register_new_instance_listener(instrumented_class) + + def _register_new_instance_listener(self, instrumented_class): + if self._model is None: + raise StorageError("In order to keep track of new instances, a ctx is needed") + + def listener(_, instance): + if not isinstance(instance, instrumented_class): + return + self._instances_to_expunge.append(instance) + tracked_instances = self.new_instances.setdefault(instance.__modelname__, {}) + tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {}) + instance_as_dict = instance.to_dict() + instance_as_dict.update((k, getattr(instance, k)) + for k in getattr(instance, '__private_fields__', [])) + tracked_attributes.update(instance_as_dict) + session = self._get_session_from_model(instrumented_class.__tablename__) + listener_args = (session, 'after_attach', listener) + sqlalchemy.event.listen(*listener_args) + self.listeners.append(listener_args) def _register_set_attribute_listener(self, instrumented_attribute, attribute_type): def listener(target, value, *_): @@ -125,6 +179,9 @@ class _Instrumentation(object): else: self.tracked_changes.clear() + self.new_instances.clear() + self._instances_to_expunge = [] + def restore(self): """Remove all listeners registered by this instrumentation""" for listener_args in self.listeners: @@ -160,7 +217,7 @@ class _Value(object): return {'initial': self.initial, 'current': self.current}.copy() -def apply_tracked_changes(tracked_changes, model): +def apply_tracked_changes(tracked_changes, new_instances, model): """Write tracked changes back to the database using provided model storage :param tracked_changes: The ``tracked_changes`` attribute of the instrumentation context @@ -169,6 +226,7 @@ def apply_tracked_changes(tracked_changes, model): """ successfully_updated_changes = dict() try: + # handle instance updates for mapi_name, tracked_instances in tracked_changes.items(): successfully_updated_changes[mapi_name] = dict() mapi = getattr(model, mapi_name) @@ -177,18 +235,27 @@ def apply_tracked_changes(tracked_changes, model): instance = None for attribute_name, value in tracked_attributes.items(): if value.initial != value.current: - if not instance: - instance = mapi.get(instance_id) + instance = instance or mapi.get(instance_id) setattr(instance, attribute_name, value.current) if instance: _validate_version_id(instance, mapi) mapi.update(instance) successfully_updated_changes[mapi_name][instance_id] = [ v.dict for v in tracked_attributes.values()] + + # Handle new instances + for mapi_name, new_instance in new_instances.items(): + successfully_updated_changes[mapi_name] = dict() + mapi = getattr(model, mapi_name) + for new_instance_kwargs in new_instance.values(): + instance = mapi.model_cls(**new_instance_kwargs) + mapi.put(instance) + successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs except BaseException: for key, value in successfully_updated_changes.items(): if not value: del successfully_updated_changes[key] + # TODO: if the successful has _STUB, the logging fails because it can't serialize the object model.logger.error( 'Registering all the changes to the storage has failed. {0}' 'The successful updates were: {0} ' http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index cedcc5f..8ad8edb 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -17,6 +17,7 @@ import logging from collections import namedtuple from contextlib import contextmanager +import aria from aria.modeling import models @@ -24,7 +25,7 @@ class MockTask(object): INFINITE_RETRIES = models.Task.INFINITE_RETRIES - def __init__(self, implementation, inputs=None, plugin=None): + def __init__(self, implementation, inputs=None, plugin=None, storage=None): self.implementation = self.name = implementation self.plugin_fk = plugin.id if plugin else None self.plugin = plugin or None @@ -33,7 +34,7 @@ class MockTask(object): self.exception = None self.id = str(uuid.uuid4()) self.logger = logging.getLogger() - self.context = MockContext() + self.context = MockContext(storage) self.attempts_count = 1 self.max_attempts = 1 self.ignore_failure = False @@ -52,14 +53,24 @@ class MockTask(object): class MockContext(object): - def __init__(self): + def __init__(self, storage=None): self.logger = logging.getLogger('mock_logger') self.task = type('SubprocessMockTask', (object, ), {'plugin': None}) - self.serialization_dict = {'context_cls': self.__class__, 'context': {}} + self.model = storage + + @property + def serialization_dict(self): + if self.model: + return {'context': self.model.serialization_dict, 'context_cls': self.__class__} + else: + return {'context_cls': self.__class__, 'context': {}} def __getattr__(self, item): return None @classmethod def deserialize_from_dict(cls, **kwargs): - return cls() + if kwargs: + return cls(storage=aria.application_model_storage(**kwargs)) + else: + return cls() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index d4482ae..29cb0e8 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -25,6 +25,7 @@ except ImportError: _celery = None app = None +import aria from aria.modeling import models from aria.orchestrator import events from aria.orchestrator.workflows.executor import ( @@ -41,12 +42,13 @@ def _get_implementation(func): return '{module}.{func.__name__}'.format(module=__name__, func=func) -def test_execute(executor): +def execute_and_assert(executor, storage=None): expected_value = 'value' - successful_task = MockTask(_get_implementation(mock_successful_task)) - failing_task = MockTask(_get_implementation(mock_failing_task)) + successful_task = MockTask(_get_implementation(mock_successful_task), storage=storage) + failing_task = MockTask(_get_implementation(mock_failing_task), storage=storage) task_with_inputs = MockTask(_get_implementation(mock_task_with_input), - inputs={'input': models.Parameter.wrap('input', 'value')}) + inputs={'input': models.Parameter.wrap('input', 'value')}, + storage=storage) for task in [successful_task, failing_task, task_with_inputs]: executor.execute(task) @@ -62,6 +64,14 @@ def test_execute(executor): assertion() +def test_thread_execute(thread_executor): + execute_and_assert(thread_executor) + + +def test_process_execute(process_executor, storage): + execute_and_assert(process_executor, storage) + + def mock_successful_task(**_): pass @@ -83,21 +93,35 @@ class MockException(Exception): pass +@pytest.fixture +def storage(tmpdir): + return aria.application_model_storage( + aria.storage.sql_mapi.SQLAlchemyModelAPI, + initiator_kwargs=dict(base_dir=str(tmpdir)) + ) + + @pytest.fixture(params=[ (thread.ThreadExecutor, {'pool_size': 1}), (thread.ThreadExecutor, {'pool_size': 2}), # subprocess needs to load a tests module so we explicitly add the root directory as if # the project has been installed in editable mode - (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), # (celery.CeleryExecutor, {'app': app}) ]) -def executor(request): +def thread_executor(request): executor_cls, executor_kwargs = request.param result = executor_cls(**executor_kwargs) yield result result.close() +@pytest.fixture +def process_executor(): + result = process.ProcessExecutor(python_path=tests.ROOT_DIR) + yield result + result.close() + + @pytest.fixture(autouse=True) def register_signals(): def start_handler(task, *args, **kwargs): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 5f240b2..e6333e8 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -18,6 +18,7 @@ import Queue import pytest +import aria from aria.orchestrator import events from aria.utils.plugin import create as create_plugin from aria.orchestrator.workflows.executor import process @@ -34,8 +35,8 @@ from . import MockTask class TestProcessExecutor(object): - def test_plugin_execution(self, executor, mock_plugin): - task = MockTask('mock_plugin1.operation', plugin=mock_plugin) + def test_plugin_execution(self, executor, mock_plugin, storage): + task = MockTask('mock_plugin1.operation', plugin=mock_plugin, storage=storage) queue = Queue.Queue() @@ -81,3 +82,11 @@ def mock_plugin(plugin_manager, tmpdir): source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1') plugin_path = create_plugin(source=source, destination_dir=str(tmpdir)) return plugin_manager.install(source=plugin_path) + + +@pytest.fixture +def storage(tmpdir): + return aria.application_model_storage( + aria.storage.sql_mapi.SQLAlchemyModelAPI, + initiator_kwargs=dict(base_dir=str(tmpdir)) + ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/tests/storage/test_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py index 673103e..bdbb17e 100644 --- a/tests/storage/test_instrumentation.py +++ b/tests/storage/test_instrumentation.py @@ -227,6 +227,7 @@ class TestInstrumentation(object): instrumentation.apply_tracked_changes( tracked_changes=instrument.tracked_changes, + new_instances={}, model=storage) instance1_1, instance1_2, instance2_1, instance2_2 = get_instances() @@ -273,8 +274,40 @@ class TestInstrumentation(object): instrument.clear() assert instrument.tracked_changes == {} - def _track_changes(self, instrumented): - instrument = instrumentation.track_changes(instrumented) + def test_new_instances(self, storage): + model_kwargs = dict( + name='name', + dict1={'initial': 'value'}, + dict2={'initial': 'value'}, + list1=['initial'], + list2=['initial'], + int1=0, + int2=0, + string2='string') + model_instance_1 = MockModel1(**model_kwargs) + model_instance_2 = MockModel2(**model_kwargs) + + instrument = self._track_changes(model=storage, instrumented_new=(MockModel1,)) + assert not instrument.tracked_changes + + storage.mock_model_1.put(model_instance_1) + storage.mock_model_2.put(model_instance_2) + # Assert all models made it to storage + assert len(storage.mock_model_1.list()) == len(storage.mock_model_2.list()) == 1 + + # Assert only one model was tracked + assert len(instrument.new_instances) == 1 + + mock_model_1 = instrument.new_instances[MockModel1.__tablename__].values()[0] + storage_model1_instance = storage.mock_model_1.get(model_instance_1.id) + + for key in model_kwargs: + assert mock_model_1[key] == model_kwargs[key] == getattr(storage_model1_instance, key) + + def _track_changes(self, instrumented_modified=None, model=None, instrumented_new=None): + instrument = instrumentation.track_changes( + model=model, + instrumented={'modified': instrumented_modified or {}, 'new': instrumented_new or {}}) instruments_holder.append(instrument) return instrument