Repository: incubator-ariatosca Updated Branches: refs/heads/runtime_props_to_attr 577620341 -> 725b7f040 (forced update)
test fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/725b7f04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/725b7f04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/725b7f04 Branch: refs/heads/runtime_props_to_attr Commit: 725b7f040db1cbb6cca27f92cfcce3a6605ef223 Parents: d664259 Author: max-orlov <ma...@gigaspaces.com> Authored: Wed May 17 12:17:09 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Wed May 17 12:29:26 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/context/common.py | 3 + aria/orchestrator/workflows/executor/process.py | 3 +- tests/helpers.py | 3 + .../orchestrator/workflows/executor/__init__.py | 3 + ...process_executor_concurrent_modifications.py | 62 ++++++++------------ 5 files changed, 36 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/725b7f04/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 9758bb5..83f7215 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -197,6 +197,9 @@ class BaseContext(object): resource_template = jinja2.Template(resource_content) return resource_template.render(variables) + def _teardown_db_resources(self): + self.model.log._session.close() + self.model.log._engine.dispose() class _Dict(collections.MutableMapping): def __init__(self, actor, model, nested=None): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/725b7f04/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index d15d878..e83584b 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -309,8 +309,7 @@ def _main(): task_func(ctx=ctx, **operation_inputs) messenger.succeeded() except BaseException as e: - ctx.model.log._session.close() - ctx.model.log._engine.dispose() + ctx._teardown_db_resources() messenger.failed(e) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/725b7f04/tests/helpers.py ---------------------------------------------------------------------- diff --git a/tests/helpers.py b/tests/helpers.py index 133e33b..4c3194b 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -47,6 +47,9 @@ class FilesystemDataHolder(object): with open(self._path, 'w') as f: return json.dump(value, f) + def __contains__(self, item): + return item in self._load() + def __setitem__(self, key, value): dict_ = self._load() dict_[key] = value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/725b7f04/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index 8ad8edb..da2ed8b 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -74,3 +74,6 @@ class MockContext(object): return cls(storage=aria.application_model_storage(**kwargs)) else: return cls() + + def _teardown_db_resources(cls): + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/725b7f04/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py index 8ed2f82..fe279ad 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -29,48 +29,37 @@ from tests.orchestrator.context import execute as execute_workflow from tests.orchestrator.workflows.helpers import events_collector from tests import mock from tests import storage +from tests import helpers -# TODO: rethink this entire module - -def test_concurrent_modification_on_task_succeeded(context, executor, lock_files): - _test(context, executor, lock_files, _test_task_succeeded, expected_failure=True) - - -@operation -def _test_task_succeeded(ctx, lock_files, key, first_value, second_value): - _concurrent_update(lock_files, ctx.node, key, first_value, second_value) +@pytest.fixture +def dataholder(tmpdir): + dataholder_path = str(tmpdir.join('dataholder')) + holder = helpers.FilesystemDataHolder(dataholder_path) + return holder -def test_concurrent_modification_on_task_failed(context, executor, lock_files): - _test(context, executor, lock_files, _test_task_failed, expected_failure=True) +def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder): + _test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False) @operation -def _test_task_failed(ctx, lock_files, key, first_value, second_value): - first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value) - if not first: - raise RuntimeError('MESSAGE') +def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path): + _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path) -def test_concurrent_modification_on_update_and_refresh(context, executor, lock_files): - _test(context, executor, lock_files, _test_update_and_refresh, expected_failure=False) +def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder): + _test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True) @operation -def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value): - node = ctx.node - first = _concurrent_update(lock_files, node, key, first_value, second_value) +def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path): + first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path) if not first: - try: - ctx.model.node.update(node) - except StorageError: - ctx.model.node.refresh(node) - else: - raise RuntimeError('Unexpected') + raise RuntimeError('MESSAGE') -def _test(context, executor, lock_files, func, expected_failure): +def _test(context, executor, lock_files, func, dataholder, expected_failure): def _node(ctx): return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) @@ -83,7 +72,8 @@ def _test(context, executor, lock_files, func, expected_failure): 'lock_files': lock_files, 'key': key, 'first_value': first_value, - 'second_value': second_value + 'second_value': second_value, + 'holder_path': dataholder.path } node = _node(context) @@ -120,16 +110,12 @@ def _test(context, executor, lock_files, func, expected_failure): pass props = _node(context).attributes - assert props[key].value == first_value + assert dataholder['invocations'] == 2 + assert props[key].value == dataholder[key] exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])] if expected_failure: assert exceptions - exception = exceptions[-1] - assert isinstance(exception, StorageError) - assert 'Version conflict' in str(exception) - else: - assert not exceptions @pytest.fixture @@ -151,7 +137,8 @@ def lock_files(tmpdir): return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file')) -def _concurrent_update(lock_files, node, key, first_value, second_value): +def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path): + holder = helpers.FilesystemDataHolder(holder_path) locker1 = fasteners.InterProcessLock(lock_files[0]) locker2 = fasteners.InterProcessLock(lock_files[1]) @@ -161,11 +148,14 @@ def _concurrent_update(lock_files, node, key, first_value, second_value): # Give chance for both processes to acquire locks while locker2.acquire(blocking=False): locker2.release() - time.sleep(0.01) + time.sleep(0.1) else: locker2.acquire() node.attributes[key] = first_value if first else second_value + holder['key'] = first_value if first else second_value + holder.setdefault('invocations', 0) + holder['invocations'] += 1 if first: locker1.release()