Repository: incubator-ariatosca Updated Branches: refs/heads/master db9ae9c70 -> 4c75aeba3
ARIA-68 Update runtime properties during operation Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4c75aeba Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4c75aeba Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4c75aeba Branch: refs/heads/master Commit: 4c75aeba322761471b247dc4e29692cde20043e9 Parents: db9ae9c Author: Dan Kilman <d...@gigaspaces.com> Authored: Tue Jan 24 16:58:10 2017 +0200 Committer: Dan Kilman <d...@gigaspaces.com> Committed: Wed Jan 25 14:06:55 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 93 ++++++++++++++------ aria/storage/instrumentation.py | 18 +++- .../test_process_executor_tracked_changes.py | 62 ++++++++++++- tests/storage/test_instrumentation.py | 38 ++++++++ 4 files changed, 179 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4c75aeba/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 9fa0302..7d990fa 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -29,6 +29,7 @@ script_dir = os.path.dirname(__file__) if script_dir in sys.path: sys.path.remove(script_dir) +import contextlib import io import threading import socket @@ -136,37 +137,39 @@ class ProcessExecutor(base.BaseExecutor): while not self._stopped: try: # Accept messages written to the server socket - message = self._recv_message() - message_type = message['type'] - if message_type == 'closed': - break - task_id = message['task_id'] - if message_type == 'started': - self._task_started(self._tasks[task_id]) - elif message_type == 'succeeded': - task = self._remove_task(task_id) - instrumentation.apply_tracked_changes( - tracked_changes=message['tracked_changes'], - model=task.context.model) - self._task_succeeded(task) - elif message_type == 'failed': - task = self._remove_task(task_id) - instrumentation.apply_tracked_changes( - tracked_changes=message['tracked_changes'], - model=task.context.model) - self._task_failed(task, exception=message['exception']) - else: - raise RuntimeError('Invalid state') + with contextlib.closing(self._server_socket.accept()[0]) as connection: + message = self._recv_message(connection) + message_type = message['type'] + if message_type == 'closed': + break + task_id = message['task_id'] + if message_type == 'started': + self._task_started(self._tasks[task_id]) + elif message_type == 'apply_tracked_changes': + task = self._tasks[task_id] + instrumentation.apply_tracked_changes( + tracked_changes=message['tracked_changes'], + model=task.context.model) + elif message_type == 'succeeded': + task = self._remove_task(task_id) + instrumentation.apply_tracked_changes( + tracked_changes=message['tracked_changes'], + model=task.context.model) + self._task_succeeded(task) + elif message_type == 'failed': + task = self._remove_task(task_id) + instrumentation.apply_tracked_changes( + tracked_changes=message['tracked_changes'], + model=task.context.model) + self._task_failed(task, exception=message['exception']) + else: + raise RuntimeError('Invalid state') except BaseException as e: self.logger.debug('Error in process executor listener: {0}'.format(e)) - def _recv_message(self): - connection, _ = self._server_socket.accept() - try: - message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE)) - return jsonpickle.loads(self._recv_bytes(connection, message_len)) - finally: - connection.close() + def _recv_message(self, connection): + message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE)) + return jsonpickle.loads(self._recv_bytes(connection, message_len)) @staticmethod def _recv_bytes(connection, count): @@ -247,6 +250,9 @@ class _Messenger(object): """Task failed message""" self._send_message(type='failed', tracked_changes=tracked_changes, exception=exception) + def apply_tracked_changes(self, tracked_changes): + self._send_message(type='apply_tracked_changes', tracked_changes=tracked_changes) + def closed(self): """Executor closed message""" self._send_message(type='closed') @@ -263,10 +269,40 @@ class _Messenger(object): }) sock.send(struct.pack(_INT_FMT, len(data))) sock.sendall(data) + # send message will block until the server side closes the connection socket + # because we want it to be synchronous + sock.recv(1) finally: sock.close() +def _patch_session(ctx, messenger, instrument): + # model will be None only in tests that test the executor component directly + if not ctx.model: + return + + # We arbitrarily select the ``node_instance`` mapi to extract the session from it. + # could have been any other mapi just as well + session = ctx.model.node_instance._session + original_refresh = session.refresh + + def patched_refresh(target): + instrument.clear(target) + original_refresh(target) + + def patched_commit(): + messenger.apply_tracked_changes(instrument.tracked_changes) + instrument.clear() + + # when autoflush is set to true (the default), refreshing an object will trigger + # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so + # far on the session. this is not the desired behavior in the subprocess + session.autoflush = False + + session.commit = patched_commit + session.refresh = patched_refresh + + def _main(): arguments_json_path = sys.argv[1] with open(arguments_json_path) as f: @@ -292,6 +328,7 @@ def _main(): with instrumentation.track_changes() as instrument: try: ctx = serialization.operation_context_from_dict(context_dict) + _patch_session(ctx=ctx, messenger=messenger, instrument=instrument) task_func = imports.load_attribute(operation_mapping) aria.install_aria_extensions() for decorate in process_executor.decorate(): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4c75aeba/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index d71190d..537dbb5 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -75,7 +75,7 @@ class _Instrumentation(object): def _register_set_attribute_listener(self, instrumented_attribute, attribute_type): def listener(target, value, *_): - mapi_name = api.generate_lower_name(target.__class__) + mapi_name = self._mapi_name(target.__class__) tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) tracked_attributes = tracked_instances.setdefault(target.id, {}) if value is None: @@ -90,7 +90,7 @@ class _Instrumentation(object): def _register_instance_listeners(self, instrumented_class, instrumented_attributes): def listener(target, *_): - mapi_name = api.generate_lower_name(instrumented_class) + mapi_name = self._mapi_name(instrumented_class) tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) tracked_attributes = tracked_instances.setdefault(target.id, {}) for attribute_name, attribute_type in instrumented_attributes.items(): @@ -108,6 +108,14 @@ class _Instrumentation(object): sqlalchemy.event.listen(*listener_args) self.listeners.append(listener_args) + def clear(self, target=None): + if target: + mapi_name = self._mapi_name(target.__class__) + tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) + tracked_instances.pop(target.id, None) + else: + self.tracked_changes.clear() + def restore(self): """Remove all listeners registered by this instrumentation""" for listener_args in self.listeners: @@ -120,6 +128,10 @@ class _Instrumentation(object): def __exit__(self, exc_type, exc_val, exc_tb): self.restore() + @staticmethod + def _mapi_name(instrumented_class): + return api.generate_lower_name(instrumented_class) + class _Value(object): # You may wonder why is this a full blown class and not a named tuple. The reason is that @@ -155,3 +167,5 @@ def apply_tracked_changes(tracked_changes, model): if not instance: instance = mapi.get(instance_id) setattr(instance, attribute_name, value.current) + if instance: + mapi.update(instance) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4c75aeba/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index 1564292..bd1fa96 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy + import pytest from aria.orchestrator.workflows import api @@ -52,18 +54,51 @@ def _update_runtime_properties(context): context.node_instance.runtime_properties.update(_TEST_RUNTIME_PROPERTIES) -def _run_workflow(context, executor, op_func): +def test_refresh_state_of_tracked_attributes(context, executor): + out = _run_workflow(context=context, executor=executor, op_func=_mock_refreshing_operation) + assert out['initial'] == out['after_refresh'] + assert out['initial'] != out['after_change'] + + +def test_apply_tracked_changes_during_an_operation(context, executor): + inputs = { + 'committed': {'some': 'new', 'properties': 'right here'}, + 'changed_but_refreshed': {'some': 'newer', 'properties': 'right there'} + } + + expected_initial = context.model.node_instance.get_by_name( + mock.models.DEPENDENCY_NODE_INSTANCE_NAME).runtime_properties + + out = _run_workflow(context=context, executor=executor, op_func=_mock_updating_operation, + inputs=inputs) + + expected_after_update = expected_initial.copy() + expected_after_update.update(inputs['committed']) + expected_after_change = expected_after_update.copy() + expected_after_change.update(inputs['changed_but_refreshed']) + expected_after_refresh = expected_after_update + + assert out['initial'] == expected_initial + assert out['after_update'] == expected_after_update + assert out['after_change'] == expected_after_change + assert out['after_refresh'] == expected_after_refresh + + +def _run_workflow(context, executor, op_func, inputs=None): @workflow def mock_workflow(ctx, graph): node_instance = ctx.model.node_instance.get_by_name( mock.models.DEPENDENCY_NODE_INSTANCE_NAME) node_instance.node.operations['test.op'] = {'operation': _operation_mapping(op_func)} - task = api.task.OperationTask.node_instance(instance=node_instance, name='test.op') + task = api.task.OperationTask.node_instance(instance=node_instance, name='test.op', + inputs=inputs or {}) graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) eng.execute() + return context.model.node_instance.get_by_name( + mock.models.DEPENDENCY_NODE_INSTANCE_NAME).runtime_properties.get('out') @operation @@ -77,6 +112,29 @@ def _mock_fail_operation(ctx): raise RuntimeError +@operation +def _mock_refreshing_operation(ctx): + out = {'initial': copy.deepcopy(ctx.node_instance.runtime_properties)} + ctx.node_instance.runtime_properties.update({'some': 'new', 'properties': 'right here'}) + out['after_change'] = copy.deepcopy(ctx.node_instance.runtime_properties) + ctx.model.node_instance.refresh(ctx.node_instance) + out['after_refresh'] = copy.deepcopy(ctx.node_instance.runtime_properties) + ctx.node_instance.runtime_properties['out'] = out + + +@operation +def _mock_updating_operation(ctx, committed, changed_but_refreshed): + out = {'initial': copy.deepcopy(ctx.node_instance.runtime_properties)} + ctx.node_instance.runtime_properties.update(committed) + ctx.model.node_instance.update(ctx.node_instance) + out['after_update'] = copy.deepcopy(ctx.node_instance.runtime_properties) + ctx.node_instance.runtime_properties.update(changed_but_refreshed) + out['after_change'] = copy.deepcopy(ctx.node_instance.runtime_properties) + ctx.model.node_instance.refresh(ctx.node_instance) + out['after_refresh'] = copy.deepcopy(ctx.node_instance.runtime_properties) + ctx.node_instance.runtime_properties['out'] = out + + def _operation_mapping(func): return '{name}.{func.__name__}'.format(name=__name__, func=func) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4c75aeba/tests/storage/test_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py index 8e7f9aa..8b826e9 100644 --- a/tests/storage/test_instrumentation.py +++ b/tests/storage/test_instrumentation.py @@ -232,6 +232,44 @@ class TestInstrumentation(object): assert instance2_1.dict1 == {'initial': 'value', 'new': 'value'} assert instance2_2.list1 == ['initial', 'new_value'] + def test_clear_instance(self, storage): + instance1 = MockModel1(name='name1') + instance2 = MockModel1(name='name2') + for instance in [instance1, instance2]: + storage.mock_model_1.put(instance) + instrument = self._track_changes({MockModel1.dict1: dict}) + instance1.dict1 = {'new': 'value'} + instance2.dict1 = {'new2': 'value2'} + assert instrument.tracked_changes == { + 'mock_model_1': { + instance1.id: {'dict1': Value(STUB, {'new': 'value'})}, + instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})} + } + } + instrument.clear(instance1) + assert instrument.tracked_changes == { + 'mock_model_1': { + instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})} + } + } + + def test_clear_all(self, storage): + instance1 = MockModel1(name='name1') + instance2 = MockModel1(name='name2') + for instance in [instance1, instance2]: + storage.mock_model_1.put(instance) + instrument = self._track_changes({MockModel1.dict1: dict}) + instance1.dict1 = {'new': 'value'} + instance2.dict1 = {'new2': 'value2'} + assert instrument.tracked_changes == { + 'mock_model_1': { + instance1.id: {'dict1': Value(STUB, {'new': 'value'})}, + instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})} + } + } + instrument.clear() + assert instrument.tracked_changes == {} + def _track_changes(self, instrumented): instrument = instrumentation.track_changes(instrumented) instruments_holder.append(instrument)