ARIA-258 Convert runtime_properties to attributes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/50b997e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/50b997e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/50b997e3 Branch: refs/heads/ARIA-149-functions-in-operation-configuration Commit: 50b997e3bfbaf26df5e66327d30fe8a015b92dd7 Parents: 0c98684 Author: max-orlov <ma...@gigaspaces.com> Authored: Sun May 14 22:38:39 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Thu May 25 18:30:21 2017 +0300 ---------------------------------------------------------------------- aria/cli/commands/nodes.py | 6 +- aria/modeling/service_common.py | 9 +- aria/modeling/service_instance.py | 8 +- aria/modeling/service_template.py | 1 - aria/modeling/types.py | 20 - .../context/collection_instrumentation.py | 242 ++++++++++++ aria/orchestrator/context/operation.py | 13 +- aria/orchestrator/context/toolbelt.py | 5 +- .../execution_plugin/ctx_proxy/server.py | 1 - aria/orchestrator/workflows/core/engine.py | 1 - aria/orchestrator/workflows/executor/process.py | 125 +----- aria/storage/instrumentation.py | 282 ------------- tests/helpers.py | 10 + tests/mock/models.py | 7 +- tests/modeling/test_mixins.py | 1 - tests/modeling/test_models.py | 28 +- .../context/test_collection_instrumentation.py | 253 ++++++++++++ tests/orchestrator/context/test_operation.py | 90 ++++- tests/orchestrator/context/test_toolbelt.py | 5 +- .../orchestrator/execution_plugin/test_local.py | 66 ++-- tests/orchestrator/execution_plugin/test_ssh.py | 36 +- tests/orchestrator/workflows/core/test_task.py | 2 +- .../orchestrator/workflows/executor/__init__.py | 4 + ...process_executor_concurrent_modifications.py | 67 ++-- .../executor/test_process_executor_extension.py | 6 +- .../test_process_executor_tracked_changes.py | 56 ++- tests/resources/scripts/test_ssh.sh | 30 +- tests/storage/test_instrumentation.py | 396 ------------------- 28 files changed, 786 insertions(+), 984 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/cli/commands/nodes.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/nodes.py b/aria/cli/commands/nodes.py index e43493f..1bbefe6 100644 --- a/aria/cli/commands/nodes.py +++ b/aria/cli/commands/nodes.py @@ -47,9 +47,9 @@ def show(node_id, model_storage, logger): # print node attributes logger.info('Node attributes:') - if node.runtime_properties: - for prop_name, prop_value in node.runtime_properties.iteritems(): - logger.info('\t{0}: {1}'.format(prop_name, prop_value)) + if node.attributes: + for param_name, param in node.attributes.iteritems(): + logger.info('\t{0}: {1}'.format(param_name, param.value)) else: logger.info('\tNo attributes') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/modeling/service_common.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_common.py b/aria/modeling/service_common.py index e9c96a4..ef19c8e 100644 --- a/aria/modeling/service_common.py +++ b/aria/modeling/service_common.py @@ -218,14 +218,13 @@ class ParameterBase(TemplateModelMixin, caching.HasCachedMethods): :type description: basestring """ - from . import models type_name = canonical_type_name(value) if type_name is None: type_name = full_type_name(value) - return models.Parameter(name=name, # pylint: disable=unexpected-keyword-arg - type_name=type_name, - value=value, - description=description) + return cls(name=name, # pylint: disable=unexpected-keyword-arg + type_name=type_name, + value=value, + description=description) class TypeBase(InstanceModelMixin): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/modeling/service_instance.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_instance.py b/aria/modeling/service_instance.py index 41a388d..7058969 100644 --- a/aria/modeling/service_instance.py +++ b/aria/modeling/service_instance.py @@ -333,8 +333,6 @@ class NodeBase(InstanceModelMixin): :vartype inbound_relationships: [:class:`Relationship`] :ivar host: Host node (can be self) :vartype host: :class:`Node` - :ivar runtime_properties: TODO: should be replaced with attributes - :vartype runtime_properties: {} :ivar state: The state of the node, according to to the TOSCA-defined node states :vartype state: string :ivar version: Used by `aria.storage.instrumentation` @@ -520,7 +518,6 @@ class NodeBase(InstanceModelMixin): # endregion description = Column(Text) - runtime_properties = Column(modeling_types.Dict) state = Column(Enum(*STATES, name='node_state'), nullable=False, default=INITIAL) version = Column(Integer, default=1) @@ -528,8 +525,9 @@ class NodeBase(InstanceModelMixin): @property def host_address(self): - if self.host and self.host.runtime_properties: - return self.host.runtime_properties.get('ip') + if self.host and self.host.attributes: + attribute = self.host.attributes.get('ip') + return attribute.value if attribute else None return None def satisfy_requirements(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/modeling/service_template.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_template.py b/aria/modeling/service_template.py index 12195a1..3110248 100644 --- a/aria/modeling/service_template.py +++ b/aria/modeling/service_template.py @@ -562,7 +562,6 @@ class NodeTemplateBase(TemplateModelMixin): type=self.type, description=deepcopy_with_locators(self.description), state=models.Node.INITIAL, - runtime_properties={}, node_template=self) utils.instantiate_dict(node, node.properties, self.properties) utils.instantiate_dict(node, node.attributes, self.attributes) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/modeling/types.py ---------------------------------------------------------------------- diff --git a/aria/modeling/types.py b/aria/modeling/types.py index 7460f47..920a0c2 100644 --- a/aria/modeling/types.py +++ b/aria/modeling/types.py @@ -286,24 +286,4 @@ _LISTENER_ARGS = (mutable.mapper, 'mapper_configured', _mutable_association_list def _register_mutable_association_listener(): event.listen(*_LISTENER_ARGS) - -def remove_mutable_association_listener(): - """ - Remove the event listener that associates ``Dict`` and ``List`` column types with - ``MutableDict`` and ``MutableList``, respectively. - - This call must happen before any model instance is instantiated. - This is because once it does, that would trigger the listener we are trying to remove. - Once it is triggered, many other listeners will then be registered. - At that point, it is too late. - - The reason this function exists is that the association listener, interferes with ARIA change - tracking instrumentation, so a way to disable it is required. - - Note that the event listener this call removes is registered by default. - """ - if event.contains(*_LISTENER_ARGS): - event.remove(*_LISTENER_ARGS) - - _register_mutable_association_listener() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/context/collection_instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/collection_instrumentation.py b/aria/orchestrator/context/collection_instrumentation.py new file mode 100644 index 0000000..91cfd35 --- /dev/null +++ b/aria/orchestrator/context/collection_instrumentation.py @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from functools import partial + +from aria.modeling import models + + +class _InstrumentedCollection(object): + + def __init__(self, + model, + parent, + field_name, + seq=None, + is_top_level=True, + **kwargs): + self._model = model + self._parent = parent + self._field_name = field_name + self._is_top_level = is_top_level + self._load(seq, **kwargs) + + @property + def _raw(self): + raise NotImplementedError + + def _load(self, seq, **kwargs): + """ + Instantiates the object from existing seq. + + :param seq: the original sequence to load from + :return: + """ + raise NotImplementedError + + def _set(self, key, value): + """ + set the changes for the current object (not in the db) + + :param key: + :param value: + :return: + """ + raise NotImplementedError + + def _del(self, collection, key): + raise NotImplementedError + + def _instrument(self, key, value): + """ + Instruments any collection to track changes (and ease of access) + :param key: + :param value: + :return: + """ + if isinstance(value, _InstrumentedCollection): + return value + elif isinstance(value, dict): + instrumentation_cls = _InstrumentedDict + elif isinstance(value, list): + instrumentation_cls = _InstrumentedList + else: + return value + + return instrumentation_cls(self._model, self, key, value, False) + + @staticmethod + def _raw_value(value): + """ + Get the raw value. + :param value: + :return: + """ + if isinstance(value, models.Parameter): + return value.value + return value + + @staticmethod + def _encapsulate_value(key, value): + """ + Create a new item cls if needed. + :param key: + :param value: + :return: + """ + if isinstance(value, models.Parameter): + return value + # If it is not wrapped + return models.Parameter.wrap(key, value) + + def __setitem__(self, key, value): + """ + Update the values in both the local and the db locations. + :param key: + :param value: + :return: + """ + self._set(key, value) + if self._is_top_level: + # We are at the top level + field = getattr(self._parent, self._field_name) + mapi = getattr(self._model, models.Parameter.__modelname__) + value = self._set_field(field, + key, + value if key in field else self._encapsulate_value(key, value)) + mapi.update(value) + else: + # We are not at the top level + self._set_field(self._parent, self._field_name, self) + + def _set_field(self, collection, key, value): + """ + enables updating the current change in the ancestors + :param collection: the collection to change + :param key: the key for the specific field + :param value: the new value + :return: + """ + if isinstance(value, _InstrumentedCollection): + value = value._raw + if key in collection and isinstance(collection[key], models.Parameter): + if isinstance(collection[key], _InstrumentedCollection): + self._del(collection, key) + collection[key].value = value + else: + collection[key] = value + return collection[key] + + def __deepcopy__(self, *args, **kwargs): + return self._raw + + +class _InstrumentedDict(_InstrumentedCollection, dict): + + def _load(self, dict_=None, **kwargs): + dict.__init__( + self, + tuple((key, self._raw_value(value)) for key, value in (dict_ or {}).items()), + **kwargs) + + def update(self, dict_=None, **kwargs): + dict_ = dict_ or {} + for key, value in dict_.items(): + self[key] = value + for key, value in kwargs.items(): + self[key] = value + + def __getitem__(self, key): + return self._instrument(key, dict.__getitem__(self, key)) + + def _set(self, key, value): + dict.__setitem__(self, key, self._raw_value(value)) + + @property + def _raw(self): + return dict(self) + + def _del(self, collection, key): + del collection[key] + + +class _InstrumentedList(_InstrumentedCollection, list): + + def _load(self, list_=None, **kwargs): + list.__init__(self, list(item for item in list_ or [])) + + def append(self, value): + self.insert(len(self), value) + + def insert(self, index, value): + list.insert(self, index, self._raw_value(value)) + if self._is_top_level: + field = getattr(self._parent, self._field_name) + field.insert(index, self._encapsulate_value(index, value)) + else: + self._parent[self._field_name] = self + + def __getitem__(self, key): + return self._instrument(key, list.__getitem__(self, key)) + + def _set(self, key, value): + list.__setitem__(self, key, value) + + def _del(self, collection, key): + del collection[key] + + @property + def _raw(self): + return list(self) + + +class _InstrumentedModel(object): + + def __init__(self, field_name, original_model, model_storage): + super(_InstrumentedModel, self).__init__() + self._field_name = field_name + self._model_storage = model_storage + self._original_model = original_model + self._apply_instrumentation() + + def __getattr__(self, item): + return getattr(self._original_model, item) + + def _apply_instrumentation(self): + + field = getattr(self._original_model, self._field_name) + + # Preserve the original value. e.g. original attributes would be located under + # _attributes + setattr(self, '_{0}'.format(self._field_name), field) + + # set instrumented value + setattr(self, self._field_name, _InstrumentedDict(self._model_storage, + self._original_model, + self._field_name, + field)) + + +def instrument_collection(field_name, func=None): + if func is None: + return partial(instrument_collection, field_name) + + def _wrapper(*args, **kwargs): + original_model = func(*args, **kwargs) + return type('Instrumented{0}'.format(original_model.__class__.__name__), + (_InstrumentedModel, ), + {})(field_name, original_model, args[0].model) + + return _wrapper http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 0ce790f..7c21351 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -21,10 +21,13 @@ import threading import aria from aria.utils import file -from .common import BaseContext +from . import ( + common, + collection_instrumentation +) -class BaseOperationContext(BaseContext): +class BaseOperationContext(common.BaseContext): """ Context object used during operation creation and execution """ @@ -114,6 +117,7 @@ class NodeOperationContext(BaseOperationContext): """ @property + @collection_instrumentation.instrument_collection('attributes') def node_template(self): """ the node of the current operation @@ -122,6 +126,7 @@ class NodeOperationContext(BaseOperationContext): return self.node.node_template @property + @collection_instrumentation.instrument_collection('attributes') def node(self): """ The node instance of the current operation @@ -136,6 +141,7 @@ class RelationshipOperationContext(BaseOperationContext): """ @property + @collection_instrumentation.instrument_collection('attributes') def source_node_template(self): """ The source node @@ -144,6 +150,7 @@ class RelationshipOperationContext(BaseOperationContext): return self.source_node.node_template @property + @collection_instrumentation.instrument_collection('attributes') def source_node(self): """ The source node instance @@ -152,6 +159,7 @@ class RelationshipOperationContext(BaseOperationContext): return self.relationship.source_node @property + @collection_instrumentation.instrument_collection('attributes') def target_node_template(self): """ The target node @@ -160,6 +168,7 @@ class RelationshipOperationContext(BaseOperationContext): return self.target_node.node_template @property + @collection_instrumentation.instrument_collection('attributes') def target_node(self): """ The target node instance http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py index def7d42..5788ee7 100644 --- a/aria/orchestrator/context/toolbelt.py +++ b/aria/orchestrator/context/toolbelt.py @@ -34,7 +34,10 @@ class NodeToolBelt(object): """ assert isinstance(self._op_context, operation.NodeOperationContext) host = self._op_context.node.host - return host.runtime_properties.get('ip') + ip = host.attributes.get('ip') + if ip: + return ip.value + class RelationshipToolBelt(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index 1ce0e08..102ff9a 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -98,7 +98,6 @@ class CtxProxy(object): quiet=True, server=BottleServerAdapter) thread = threading.Thread(target=serve) - thread.daemon = True thread.start() return thread http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 561265c..3a96804 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -69,7 +69,6 @@ class Engine(logger.LoggerMixin): else: events.on_success_workflow_signal.send(self._workflow_context) except BaseException as e: - events.on_failure_workflow_signal.send(self._workflow_context, exception=e) raise http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index da6bbb2..f02e0a6 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -43,14 +43,12 @@ import jsonpickle import aria from aria.orchestrator.workflows.executor import base -from aria.storage import instrumentation from aria.extension import process_executor from aria.utils import ( imports, exceptions, process as process_utils ) -from aria.modeling import types as modeling_types _INT_FMT = 'I' @@ -82,7 +80,6 @@ class ProcessExecutor(base.BaseExecutor): 'started': self._handle_task_started_request, 'succeeded': self._handle_task_succeeded_request, 'failed': self._handle_task_failed_request, - 'apply_tracked_changes': self._handle_apply_tracked_changes_request } # Server socket used to accept task status messages from subprocesses @@ -196,41 +193,13 @@ class ProcessExecutor(base.BaseExecutor): def _handle_task_started_request(self, task_id, **kwargs): self._task_started(self._tasks[task_id]) - def _handle_task_succeeded_request(self, task_id, request, **kwargs): + def _handle_task_succeeded_request(self, task_id, **kwargs): task = self._remove_task(task_id) - try: - self._apply_tracked_changes(task, request) - except BaseException as e: - e.message += UPDATE_TRACKED_CHANGES_FAILED_STR - self._task_failed(task, exception=e) - else: - self._task_succeeded(task) + self._task_succeeded(task) def _handle_task_failed_request(self, task_id, request, **kwargs): task = self._remove_task(task_id) - try: - self._apply_tracked_changes(task, request) - except BaseException as e: - e.message += 'Task failed due to {0}.'.format(request['exception']) + \ - UPDATE_TRACKED_CHANGES_FAILED_STR - self._task_failed( - task, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info())) - else: - self._task_failed(task, exception=request['exception'], traceback=request['traceback']) - - def _handle_apply_tracked_changes_request(self, task_id, request, response): - task = self._tasks[task_id] - try: - self._apply_tracked_changes(task, request) - except BaseException as e: - response['exception'] = exceptions.wrap_if_needed(e) - - @staticmethod - def _apply_tracked_changes(task, request): - instrumentation.apply_tracked_changes( - tracked_changes=request['tracked_changes'], - new_instances=request['new_instances'], - model=task.context.model) + self._task_failed(task, exception=request['exception'], traceback=request['traceback']) def _send_message(connection, message): @@ -278,28 +247,19 @@ class _Messenger(object): """Task started message""" self._send_message(type='started') - def succeeded(self, tracked_changes, new_instances): + def succeeded(self): """Task succeeded message""" - self._send_message( - type='succeeded', tracked_changes=tracked_changes, new_instances=new_instances) + self._send_message(type='succeeded') - def failed(self, tracked_changes, new_instances, exception): + def failed(self, exception): """Task failed message""" - self._send_message(type='failed', - tracked_changes=tracked_changes, - new_instances=new_instances, - exception=exception) - - def apply_tracked_changes(self, tracked_changes, new_instances): - self._send_message(type='apply_tracked_changes', - tracked_changes=tracked_changes, - new_instances=new_instances) + self._send_message(type='failed', exception=exception) def closed(self): """Executor closed message""" self._send_message(type='closed') - def _send_message(self, type, tracked_changes=None, new_instances=None, exception=None): + def _send_message(self, type, exception=None): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(('localhost', self.port)) try: @@ -308,8 +268,6 @@ class _Messenger(object): 'task_id': self.task_id, 'exception': exceptions.wrap_if_needed(exception), 'traceback': exceptions.get_exception_as_string(*sys.exc_info()), - 'tracked_changes': tracked_changes or {}, - 'new_instances': new_instances or {} }) response = _recv_message(sock) response_exception = response.get('exception') @@ -319,39 +277,6 @@ class _Messenger(object): sock.close() -def _patch_ctx(ctx, messenger, instrument): - # model will be None only in tests that test the executor component directly - if not ctx.model: - return - - # We arbitrarily select the ``node`` mapi to extract the session from it. - # could have been any other mapi just as well - session = ctx.model.node._session - original_refresh = session.refresh - - def patched_refresh(target): - instrument.clear(target) - original_refresh(target) - - def patched_commit(): - messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances) - instrument.expunge_session() - instrument.clear() - - def patched_rollback(): - # Rollback is performed on parent process when commit fails - instrument.expunge_session() - - # when autoflush is set to true (the default), refreshing an object will trigger - # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so - # far on the session. this is not the desired behavior in the subprocess - session.autoflush = False - - session.commit = patched_commit - session.rollback = patched_rollback - session.refresh = patched_refresh - - def _main(): arguments_json_path = sys.argv[1] with open(arguments_json_path) as f: @@ -369,32 +294,24 @@ def _main(): operation_inputs = arguments['operation_inputs'] context_dict = arguments['context'] - # This is required for the instrumentation work properly. - # See docstring of `remove_mutable_association_listener` for further details - modeling_types.remove_mutable_association_listener() try: ctx = context_dict['context_cls'].instantiate_from_dict(**context_dict['context']) except BaseException as e: - messenger.failed(exception=e, tracked_changes=None, new_instances=None) + messenger.failed(e) return - with instrumentation.track_changes(ctx.model) as instrument: - try: - messenger.started() - _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument) - task_func = imports.load_attribute(implementation) - aria.install_aria_extensions() - for decorate in process_executor.decorate(): - task_func = decorate(task_func) - task_func(ctx=ctx, **operation_inputs) - messenger.succeeded(tracked_changes=instrument.tracked_changes, - new_instances=instrument.new_instances) - except BaseException as e: - messenger.failed(exception=e, - tracked_changes=instrument.tracked_changes, - new_instances=instrument.new_instances) - finally: - instrument.expunge_session() + try: + messenger.started() + task_func = imports.load_attribute(implementation) + aria.install_aria_extensions() + for decorate in process_executor.decorate(): + task_func = decorate(task_func) + task_func(ctx=ctx, **operation_inputs) + ctx.close() + messenger.succeeded() + except BaseException as e: + ctx.close() + messenger.failed(e) if __name__ == '__main__': _main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py deleted file mode 100644 index 390f933..0000000 --- a/aria/storage/instrumentation.py +++ /dev/null @@ -1,282 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import copy -import json -import os - -import sqlalchemy.event - -from ..modeling import models as _models -from ..storage.exceptions import StorageError - - -_VERSION_ID_COL = 'version' -_STUB = object() -_INSTRUMENTED = { - 'modified': { - _models.Node.runtime_properties: dict, - _models.Node.state: str, - _models.Task.status: str, - }, - 'new': (_models.Log, ) - -} - -_NEW_INSTANCE = 'NEW_INSTANCE' - - -def track_changes(model=None, instrumented=None): - """Track changes in the specified model columns - - This call will register event listeners using sqlalchemy's event mechanism. The listeners - instrument all returned objects such that the attributes specified in ``instrumented``, will - be replaced with a value that is stored in the returned instrumentation context - ``tracked_changes`` property. - - Why should this be implemented when sqlalchemy already does a fantastic job at tracking changes - you ask? Well, when sqlalchemy is used with sqlite, due to how sqlite works, only one process - can hold a write lock to the database. This does not work well when ARIA runs tasks in - subprocesses (by the process executor) and these tasks wish to change some state as well. These - tasks certainly deserve a chance to do so! - - To enable this, the subprocess calls ``track_changes()`` before any state changes are made. - At the end of the subprocess execution, it should return the ``tracked_changes`` attribute of - the instrumentation context returned from this call, to the parent process. The parent process - will then call ``apply_tracked_changes()`` that resides in this module as well. - At that point, the changes will actually be written back to the database. - - :param model: the model storage. it should hold a mapi for each model. the session of each mapi - is needed to setup events - :param instrumented: A dict from model columns to their python native type - :return: The instrumentation context - """ - return _Instrumentation(model, instrumented or _INSTRUMENTED) - - -class _Instrumentation(object): - - def __init__(self, model, instrumented): - self.tracked_changes = {} - self.new_instances = {} - self.listeners = [] - self._instances_to_expunge = [] - self._model = model - self._track_changes(instrumented) - - @property - def _new_instance_id(self): - return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, - index=len(self._instances_to_expunge)) - - def expunge_session(self): - for new_instance in self._instances_to_expunge: - self._get_session_from_model(new_instance.__tablename__).expunge(new_instance) - - def _get_session_from_model(self, tablename): - mapi = getattr(self._model, tablename, None) - if mapi: - return mapi._session - raise StorageError("Could not retrieve session for {0}".format(tablename)) - - def _track_changes(self, instrumented): - instrumented_attribute_classes = {} - # Track any newly-set attributes. - for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items(): - self._register_set_attribute_listener( - instrumented_attribute=instrumented_attribute, - attribute_type=attribute_type) - instrumented_class = instrumented_attribute.parent.entity - instrumented_class_attributes = instrumented_attribute_classes.setdefault( - instrumented_class, {}) - instrumented_class_attributes[instrumented_attribute.key] = attribute_type - - # Track any global instance update such as 'refresh' or 'load' - for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items(): - self._register_instance_listeners(instrumented_class=instrumented_class, - instrumented_attributes=instrumented_attributes) - - # Track any newly created instances. - for instrumented_class in instrumented.get('new', {}): - self._register_new_instance_listener(instrumented_class) - - def _register_new_instance_listener(self, instrumented_class): - if self._model is None: - raise StorageError("In order to keep track of new instances, a ctx is needed") - - def listener(_, instance): - if not isinstance(instance, instrumented_class): - return - self._instances_to_expunge.append(instance) - tracked_instances = self.new_instances.setdefault(instance.__modelname__, {}) - tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {}) - instance_as_dict = instance.to_dict() - instance_as_dict.update((k, getattr(instance, k)) - for k in getattr(instance, '__private_fields__', [])) - tracked_attributes.update(instance_as_dict) - session = self._get_session_from_model(instrumented_class.__tablename__) - listener_args = (session, 'after_attach', listener) - sqlalchemy.event.listen(*listener_args) - self.listeners.append(listener_args) - - def _register_set_attribute_listener(self, instrumented_attribute, attribute_type): - def listener(target, value, *_): - mapi_name = target.__modelname__ - tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) - tracked_attributes = tracked_instances.setdefault(target.id, {}) - if value is None: - current = None - else: - current = copy.deepcopy(attribute_type(value)) - tracked_attributes[instrumented_attribute.key] = _Value(_STUB, current) - return current - listener_args = (instrumented_attribute, 'set', listener) - sqlalchemy.event.listen(*listener_args, retval=True) - self.listeners.append(listener_args) - - def _register_instance_listeners(self, instrumented_class, instrumented_attributes): - def listener(target, *_): - mapi_name = instrumented_class.__modelname__ - tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) - tracked_attributes = tracked_instances.setdefault(target.id, {}) - if hasattr(target, _VERSION_ID_COL): - # We want to keep track of the initial version id so it can be compared - # with the committed version id when the tracked changes are applied - tracked_attributes.setdefault(_VERSION_ID_COL, - _Value(_STUB, getattr(target, _VERSION_ID_COL))) - for attribute_name, attribute_type in instrumented_attributes.items(): - if attribute_name not in tracked_attributes: - initial = getattr(target, attribute_name) - if initial is None: - current = None - else: - current = copy.deepcopy(attribute_type(initial)) - tracked_attributes[attribute_name] = _Value(initial, current) - target.__dict__[attribute_name] = tracked_attributes[attribute_name].current - for listener_args in ((instrumented_class, 'load', listener), - (instrumented_class, 'refresh', listener), - (instrumented_class, 'refresh_flush', listener)): - sqlalchemy.event.listen(*listener_args) - self.listeners.append(listener_args) - - def clear(self, target=None): - if target: - mapi_name = target.__modelname__ - tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) - tracked_instances.pop(target.id, None) - else: - self.tracked_changes.clear() - - self.new_instances.clear() - self._instances_to_expunge = [] - - def restore(self): - """Remove all listeners registered by this instrumentation""" - for listener_args in self.listeners: - if sqlalchemy.event.contains(*listener_args): - sqlalchemy.event.remove(*listener_args) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.restore() - - -class _Value(object): - # You may wonder why is this a full blown class and not a named tuple. The reason is that - # jsonpickle that is used to serialize the tracked_changes, does not handle named tuples very - # well. At the very least, I could not get it to behave. - - def __init__(self, initial, current): - self.initial = initial - self.current = current - - def __eq__(self, other): - if not isinstance(other, _Value): - return False - return self.initial == other.initial and self.current == other.current - - def __hash__(self): - return hash((self.initial, self.current)) - - @property - def dict(self): - return {'initial': self.initial, 'current': self.current}.copy() - - -def apply_tracked_changes(tracked_changes, new_instances, model): - """Write tracked changes back to the database using provided model storage - - :param tracked_changes: The ``tracked_changes`` attribute of the instrumentation context - returned by calling ``track_changes()`` - :param model: The model storage used to actually apply the changes - """ - successfully_updated_changes = dict() - try: - # handle instance updates - for mapi_name, tracked_instances in tracked_changes.items(): - successfully_updated_changes[mapi_name] = dict() - mapi = getattr(model, mapi_name) - for instance_id, tracked_attributes in tracked_instances.items(): - successfully_updated_changes[mapi_name][instance_id] = dict() - instance = None - for attribute_name, value in tracked_attributes.items(): - if value.initial != value.current: - instance = instance or mapi.get(instance_id) - setattr(instance, attribute_name, value.current) - if instance: - _validate_version_id(instance, mapi) - mapi.update(instance) - successfully_updated_changes[mapi_name][instance_id] = [ - v.dict for v in tracked_attributes.values()] - - # Handle new instances - for mapi_name, new_instance in new_instances.items(): - successfully_updated_changes[mapi_name] = dict() - mapi = getattr(model, mapi_name) - for new_instance_kwargs in new_instance.values(): - instance = mapi.model_cls(**new_instance_kwargs) - mapi.put(instance) - successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs - except BaseException: - for key, value in successfully_updated_changes.items(): - if not value: - del successfully_updated_changes[key] - # TODO: if the successful has _STUB, the logging fails because it can't serialize the object - model.logger.error( - 'Registering all the changes to the storage has failed. {0}' - 'The successful updates were: {0} ' - '{1}'.format(os.linesep, json.dumps(successfully_updated_changes, indent=4))) - - raise - - -def _validate_version_id(instance, mapi): - version_id = sqlalchemy.inspect(instance).committed_state.get(_VERSION_ID_COL) - # There are two version conflict code paths: - # 1. The instance committed state loaded already holds a newer version, - # in this case, we manually raise the error - # 2. The UPDATE statement is executed with version validation and sqlalchemy - # will raise a StateDataError if there is a version mismatch. - if version_id and getattr(instance, _VERSION_ID_COL) != version_id: - object_version_id = getattr(instance, _VERSION_ID_COL) - mapi._session.rollback() - raise StorageError( - 'Version conflict: committed and object {0} differ ' - '[committed {0}={1}, object {0}={2}]' - .format(_VERSION_ID_COL, - version_id, - object_version_id)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/helpers.py ---------------------------------------------------------------------- diff --git a/tests/helpers.py b/tests/helpers.py index 3c3efc9..4c3194b 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -47,6 +47,9 @@ class FilesystemDataHolder(object): with open(self._path, 'w') as f: return json.dump(value, f) + def __contains__(self, item): + return item in self._load() + def __setitem__(self, key, value): dict_ = self._load() dict_[key] = value @@ -67,6 +70,13 @@ class FilesystemDataHolder(object): self._dump(dict_) return return_value + def update(self, dict_=None, **kwargs): + current_dict = self._load() + if dict_: + current_dict.update(dict_) + current_dict.update(**kwargs) + self._dump(current_dict) + @property def path(self): return self._path http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index f066551..98703d5 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -120,7 +120,7 @@ def create_node_with_dependencies(include_attribute=False): node_template.service_template.services[0] = create_service(node_template.service_template) node = create_node(node_template, node_template.service_template.services[0]) if include_attribute: - node.runtime_properties = {'attribute1': 'value1'} + node.attributes['attribute1'] = models.Parameter.wrap('attribute1', 'value1') # pylint: disable=unsubscriptable-object return node @@ -184,13 +184,10 @@ def create_dependent_node_template( ) -def create_node(dependency_node_template, service, name=NODE_NAME, state=models.Node.INITIAL, - runtime_properties=None): - runtime_properties = runtime_properties or {} +def create_node(dependency_node_template, service, name=NODE_NAME, state=models.Node.INITIAL): node = models.Node( name=name, type=dependency_node_template.type, - runtime_properties=runtime_properties, version=None, node_template=dependency_node_template, state=state, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/modeling/test_mixins.py ---------------------------------------------------------------------- diff --git a/tests/modeling/test_mixins.py b/tests/modeling/test_mixins.py index a18a04e..2c91a4b 100644 --- a/tests/modeling/test_mixins.py +++ b/tests/modeling/test_mixins.py @@ -121,7 +121,6 @@ def test_relationship_model_ordering(context): new_node = modeling.models.Node( name='new_node', type=source_node.type, - runtime_properties={}, service=service, version=None, node_template=new_node_template, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/modeling/test_models.py ---------------------------------------------------------------------- diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py index 61034bd..57511dd 100644 --- a/tests/modeling/test_models.py +++ b/tests/modeling/test_models.py @@ -538,22 +538,20 @@ class TestNodeTemplate(object): class TestNode(object): @pytest.mark.parametrize( - 'is_valid, name, runtime_properties, state, version', + 'is_valid, name, state, version', [ - (False, m_cls, {}, 'state', 1), - (False, 'name', m_cls, 'state', 1), - (False, 'name', {}, 'state', 1), - (False, 'name', {}, m_cls, 1), - (False, m_cls, {}, 'state', m_cls), - - (True, 'name', {}, 'initial', 1), - (True, None, {}, 'initial', 1), - (True, 'name', None, 'initial', 1), - (True, 'name', {}, 'initial', None), + (False, m_cls, 'state', 1), + (False, 'name', 'state', 1), + (False, 'name', m_cls, 1), + (False, m_cls, 'state', m_cls), + + (True, 'name', 'initial', 1), + (True, None, 'initial', 1), + (True, 'name', 'initial', 1), + (True, 'name', 'initial', None), ] ) - def test_node_model_creation(self, node_template_storage, is_valid, name, runtime_properties, - state, version): + def test_node_model_creation(self, node_template_storage, is_valid, name, state, version): node = _test_model( is_valid=is_valid, storage=node_template_storage, @@ -562,7 +560,6 @@ class TestNode(object): node_template=node_template_storage.node_template.list()[0], type=node_template_storage.type.list()[0], name=name, - runtime_properties=runtime_properties, state=state, version=version, service=node_template_storage.service.list()[0] @@ -635,7 +632,6 @@ class TestNodeHostAddress(object): name='node', node_template=node_template, type=storage.type.list()[0], - runtime_properties={}, state='initial', service=storage.service.list()[0] ) @@ -644,7 +640,7 @@ class TestNodeHostAddress(object): if host_address is not None: host_address = host_address.value if host_address: - kwargs['runtime_properties']['ip'] = host_address + kwargs.setdefault('attributes', {})['ip'] = Parameter.wrap('ip', host_address) if is_host: kwargs['host_fk'] = 1 elif host_fk: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/context/test_collection_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_collection_instrumentation.py b/tests/orchestrator/context/test_collection_instrumentation.py new file mode 100644 index 0000000..3ee5a44 --- /dev/null +++ b/tests/orchestrator/context/test_collection_instrumentation.py @@ -0,0 +1,253 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.modeling.models import Parameter +from aria.orchestrator.context import collection_instrumentation + + +class MockActor(object): + def __init__(self): + self.dict_ = {} + self.list_ = [] + + +class MockModel(object): + + def __init__(self): + self.parameter = type('MockModel', (object, ), {'model_cls': Parameter, + 'put': lambda *args, **kwargs: None, + 'update': lambda *args, **kwargs: None})() + + +class CollectionInstrumentation(object): + + @pytest.fixture + def actor(self): + return MockActor() + + @pytest.fixture + def model(self): + return MockModel() + + @pytest.fixture + def dict_(self, actor, model): + return collection_instrumentation._InstrumentedDict(model, actor, 'dict_') + + @pytest.fixture + def list_(self, actor, model): + return collection_instrumentation._InstrumentedList(model, actor, 'list_') + + +class TestDict(CollectionInstrumentation): + + def test_keys(self, actor, dict_): + dict_.update( + { + 'key1': Parameter.wrap('key1', 'value1'), + 'key2': Parameter.wrap('key2', 'value2') + } + ) + assert sorted(dict_.keys()) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys()) + + def test_values(self, actor, dict_): + dict_.update({ + 'key1': Parameter.wrap('key1', 'value1'), + 'key2': Parameter.wrap('key1', 'value2') + }) + assert (sorted(dict_.values()) == + sorted(['value1', 'value2']) == + sorted(v.value for v in actor.dict_.values())) + + def test_items(self, dict_): + dict_.update({ + 'key1': Parameter.wrap('key1', 'value1'), + 'key2': Parameter.wrap('key1', 'value2') + }) + assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')]) + + def test_iter(self, actor, dict_): + dict_.update({ + 'key1': Parameter.wrap('key1', 'value1'), + 'key2': Parameter.wrap('key1', 'value2') + }) + assert sorted(list(dict_)) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys()) + + def test_bool(self, dict_): + assert not dict_ + dict_.update({ + 'key1': Parameter.wrap('key1', 'value1'), + 'key2': Parameter.wrap('key1', 'value2') + }) + assert dict_ + + def test_set_item(self, actor, dict_): + dict_['key1'] = Parameter.wrap('key1', 'value1') + assert dict_['key1'] == 'value1' == actor.dict_['key1'].value + assert isinstance(actor.dict_['key1'], Parameter) + + def test_nested(self, actor, dict_): + dict_['key'] = {} + assert isinstance(actor.dict_['key'], Parameter) + assert dict_['key'] == actor.dict_['key'].value == {} + + dict_['key']['inner_key'] = 'value' + + assert len(dict_) == 1 + assert 'inner_key' in dict_['key'] + assert dict_['key']['inner_key'] == 'value' + assert dict_['key'].keys() == ['inner_key'] + assert dict_['key'].values() == ['value'] + assert dict_['key'].items() == [('inner_key', 'value')] + assert isinstance(actor.dict_['key'], Parameter) + assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict) + + dict_['key'].update({'updated_key': 'updated_value'}) + assert len(dict_) == 1 + assert 'updated_key' in dict_['key'] + assert dict_['key']['updated_key'] == 'updated_value' + assert sorted(dict_['key'].keys()) == sorted(['inner_key', 'updated_key']) + assert sorted(dict_['key'].values()) == sorted(['value', 'updated_value']) + assert sorted(dict_['key'].items()) == sorted([('inner_key', 'value'), + ('updated_key', 'updated_value')]) + assert isinstance(actor.dict_['key'], Parameter) + assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict) + + dict_.update({'key': 'override_value'}) + assert len(dict_) == 1 + assert 'key' in dict_ + assert dict_['key'] == 'override_value' + assert len(actor.dict_) == 1 + assert isinstance(actor.dict_['key'], Parameter) + assert actor.dict_['key'].value == 'override_value' + + def test_get_item(self, actor, dict_): + dict_['key1'] = Parameter.wrap('key1', 'value1') + assert isinstance(actor.dict_['key1'], Parameter) + + def test_update(self, actor, dict_): + dict_['key1'] = 'value1' + + new_dict = {'key2': 'value2'} + dict_.update(new_dict) + assert len(dict_) == 2 + assert dict_['key2'] == 'value2' + assert isinstance(actor.dict_['key2'], Parameter) + + new_dict = {} + new_dict.update(dict_) + assert new_dict['key1'] == dict_['key1'] + + def test_copy(self, dict_): + dict_['key1'] = 'value1' + + new_dict = dict_.copy() + assert new_dict is not dict_ + assert new_dict == dict_ + + dict_['key1'] = 'value2' + assert new_dict['key1'] == 'value1' + assert dict_['key1'] == 'value2' + + def test_clear(self, dict_): + dict_['key1'] = 'value1' + dict_.clear() + + assert len(dict_) == 0 + + +class TestList(CollectionInstrumentation): + + def test_append(self, actor, list_): + list_.append(Parameter.wrap('name', 'value1')) + list_.append('value2') + assert len(actor.list_) == 2 + assert len(list_) == 2 + assert isinstance(actor.list_[0], Parameter) + assert list_[0] == 'value1' + + assert isinstance(actor.list_[1], Parameter) + assert list_[1] == 'value2' + + list_[0] = 'new_value1' + list_[1] = 'new_value2' + assert isinstance(actor.list_[1], Parameter) + assert isinstance(actor.list_[1], Parameter) + assert list_[0] == 'new_value1' + assert list_[1] == 'new_value2' + + def test_iter(self, list_): + list_.append('value1') + list_.append('value2') + assert sorted(list_) == sorted(['value1', 'value2']) + + def test_insert(self, actor, list_): + list_.append('value1') + list_.insert(0, 'value2') + list_.insert(2, 'value3') + list_.insert(10, 'value4') + assert sorted(list_) == sorted(['value1', 'value2', 'value3', 'value4']) + assert len(actor.list_) == 4 + + def test_set(self, list_): + list_.append('value1') + list_.append('value2') + + list_[1] = 'value3' + assert len(list_) == 2 + assert sorted(list_) == sorted(['value1', 'value3']) + + def test_insert_into_nested(self, actor, list_): + list_.append([]) + + list_[0].append('inner_item') + assert isinstance(actor.list_[0], Parameter) + assert len(list_) == 1 + assert list_[0][0] == 'inner_item' + + list_[0].append('new_item') + assert isinstance(actor.list_[0], Parameter) + assert len(list_) == 1 + assert list_[0][1] == 'new_item' + + assert list_[0] == ['inner_item', 'new_item'] + assert ['inner_item', 'new_item'] == list_[0] + + +class TestDictList(CollectionInstrumentation): + def test_dict_in_list(self, actor, list_): + list_.append({}) + assert len(list_) == 1 + assert isinstance(actor.list_[0], Parameter) + assert actor.list_[0].value == {} + + list_[0]['key'] = 'value' + assert list_[0]['key'] == 'value' + assert len(actor.list_) == 1 + assert isinstance(actor.list_[0], Parameter) + assert actor.list_[0].value['key'] == 'value' + + def test_list_in_dict(self, actor, dict_): + dict_['key'] = [] + assert len(dict_) == 1 + assert isinstance(actor.dict_['key'], Parameter) + assert actor.dict_['key'].value == [] + + dict_['key'].append('value') + assert dict_['key'][0] == 'value' + assert len(actor.dict_) == 1 + assert isinstance(actor.dict_['key'], Parameter) + assert actor.dict_['key'].value[0] == 'value' http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index cdeb5fa..5d193bc 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -343,6 +343,74 @@ def test_relationship_operation_logging(ctx, executor): _assert_loggins(ctx, inputs) +def test_attribute_consumption(ctx, executor, dataholder): + # region Updating node operation + node_int_name, node_op_name = mock.operations.NODE_OPERATIONS_INSTALL[0] + + source_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) + + inputs = {'dict_': {'key': 'value'}, + 'set_test_dict': {'key2': 'value2'}} + interface = mock.models.create_interface( + source_node.service, + node_int_name, + node_op_name, + operation_kwargs=dict( + implementation=op_path(attribute_altering_operation, module_path=__name__), + inputs=inputs) + ) + source_node.interfaces[interface.name] = interface + ctx.model.node.update(source_node) + # endregion + + # region updating relationship operation + rel_int_name, rel_op_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[2] + + relationship = ctx.model.relationship.list()[0] + interface = mock.models.create_interface( + relationship.source_node.service, + rel_int_name, + rel_op_name, + operation_kwargs=dict( + implementation=op_path(attribute_consuming_operation, module_path=__name__), + inputs={'holder_path': dataholder.path} + ) + ) + relationship.interfaces[interface.name] = interface + ctx.model.relationship.update(relationship) + # endregion + + @workflow + def basic_workflow(graph, **_): + graph.sequence( + api.task.OperationTask( + source_node, + interface_name=node_int_name, + operation_name=node_op_name, + inputs=inputs + ), + api.task.OperationTask( + relationship, + interface_name=rel_int_name, + operation_name=rel_op_name, + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + assert len(source_node.attributes) == len(target_node.attributes) == 2 + assert source_node.attributes['key'] != target_node.attributes['key'] + assert source_node.attributes['key'].value == \ + target_node.attributes['key'].value == \ + dataholder['key'] == 'value' + + assert source_node.attributes['key2'] != target_node.attributes['key2'] + assert source_node.attributes['key2'].value == \ + target_node.attributes['key2'].value == \ + dataholder['key2'] == 'value2' + + def _assert_loggins(ctx, inputs): # The logs should contain the following: Workflow Start, Operation Start, custom operation @@ -377,10 +445,10 @@ def _assert_loggins(ctx, inputs): @operation def logged_operation(ctx, **_): - ctx.logger.info(ctx.task.inputs['op_start']) + ctx.logger.info(ctx.task.inputs['op_start'].value) # enables to check the relation between the created_at field properly time.sleep(1) - ctx.logger.debug(ctx.task.inputs['op_end']) + ctx.logger.debug(ctx.task.inputs['op_end'].value) @operation @@ -422,3 +490,21 @@ def get_node_id(ctx, holder_path, **_): def _test_plugin_workdir(ctx, filename, content): with open(os.path.join(ctx.plugin_workdir, filename), 'w') as f: f.write(content) + + +@operation +def attribute_altering_operation(ctx, dict_, set_test_dict, **_): + ctx.node.attributes.update(dict_) + + for key, value in set_test_dict.items(): + ctx.node.attributes[key] = value + + +@operation +def attribute_consuming_operation(ctx, holder_path, **_): + holder = helpers.FilesystemDataHolder(holder_path) + ctx.target_node.attributes.update(ctx.source_node.attributes) + holder.update(**ctx.target_node.attributes) + + ctx.target_node.attributes['key2'] = ctx.source_node.attributes['key2'] + holder['key2'] = ctx.target_node.attributes['key2'] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/context/test_toolbelt.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py index d199954..fc34907 100644 --- a/tests/orchestrator/context/test_toolbelt.py +++ b/tests/orchestrator/context/test_toolbelt.py @@ -16,6 +16,7 @@ import pytest from aria import workflow, operation +from aria.modeling import models from aria.orchestrator import context from aria.orchestrator.workflows import api from aria.orchestrator.workflows.executor import thread @@ -93,7 +94,7 @@ def test_host_ip(workflow_context, executor, dataholder): operation_kwargs=dict(implementation=op_path(host_ip, module_path=__name__), inputs=inputs) ) dependency_node.interfaces[interface.name] = interface - dependency_node.runtime_properties['ip'] = '1.1.1.1' + dependency_node.attributes['ip'] = models.Parameter.wrap('ip', '1.1.1.1') workflow_context.model.node.update(dependency_node) @@ -110,7 +111,7 @@ def test_host_ip(workflow_context, executor, dataholder): execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) - assert dataholder.get('host_ip') == dependency_node.runtime_properties.get('ip') + assert dataholder.get('host_ip') == dependency_node.attributes.get('ip').value def test_relationship_tool_belt(workflow_context, executor, dataholder): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 09d0499..d9115e1 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -43,26 +43,26 @@ class TestLocalRunScript(object): script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties map.key value + ctx node attributes map.key value ''', windows_script=''' - ctx node runtime-properties map.key value + ctx node attributes map.key value ''') props = self._run( executor, workflow_context, script_path=script_path) - assert props['map']['key'] == 'value' + assert props['map'].value['key'] == 'value' def test_process_env(self, executor, workflow_context, tmpdir): script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties map.key1 $key1 - ctx node runtime-properties map.key2 $key2 + ctx node attributes map.key1 $key1 + ctx node attributes map.key2 $key2 ''', windows_script=''' - ctx node runtime-properties map.key1 %key1% - ctx node runtime-properties map.key2 %key2% + ctx node attributes map.key1 %key1% + ctx node attributes map.key2 %key2% ''') props = self._run( executor, workflow_context, @@ -73,7 +73,7 @@ class TestLocalRunScript(object): 'key2': 'value2' } }) - p_map = props['map'] + p_map = props['map'].value assert p_map['key1'] == 'value1' assert p_map['key2'] == 'value2' @@ -81,10 +81,10 @@ class TestLocalRunScript(object): script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties map.cwd $PWD + ctx node attributes map.cwd $PWD ''', windows_script=''' - ctx node runtime-properties map.cwd %CD% + ctx node attributes map.cwd %CD% ''') tmpdir = str(tmpdir) props = self._run( @@ -93,11 +93,11 @@ class TestLocalRunScript(object): process={ 'cwd': tmpdir }) - p_map = props['map'] + p_map = props['map'].value assert p_map['cwd'] == tmpdir def test_process_command_prefix(self, executor, workflow_context, tmpdir): - use_ctx = 'ctx node runtime-properties map.key value' + use_ctx = 'ctx node attributes map.key value' python_script = ['import subprocess', 'subprocess.Popen("{0}".split(' ')).communicate()[0]'.format(use_ctx)] python_script = '\n'.join(python_script) @@ -114,19 +114,19 @@ class TestLocalRunScript(object): 'env': {'TEST_KEY': 'value'}, 'command_prefix': 'python' }) - p_map = props['map'] + p_map = props['map'].value assert p_map['key'] == 'value' def test_process_args(self, executor, workflow_context, tmpdir): script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties map.arg1 "$1" - ctx node runtime-properties map.arg2 $2 + ctx node attributes map.arg1 "$1" + ctx node attributes map.arg2 $2 ''', windows_script=''' - ctx node runtime-properties map.arg1 %1 - ctx node runtime-properties map.arg2 %2 + ctx node attributes map.arg1 %1 + ctx node attributes map.arg2 %2 ''') props = self._run( executor, workflow_context, @@ -134,8 +134,8 @@ class TestLocalRunScript(object): process={ 'args': ['"arg with spaces"', 'arg2'] }) - assert props['map']['arg1'] == 'arg with spaces' - assert props['map']['arg2'] == 'arg2' + assert props['map'].value['arg1'] == 'arg with spaces' + assert props['map'].value['arg2'] == 'arg2' def test_no_script_path(self, executor, workflow_context): exception = self._run_and_get_task_exception( @@ -187,7 +187,7 @@ class TestLocalRunScript(object): script = ''' from aria.orchestrator.execution_plugin import ctx, inputs if __name__ == '__main__': - ctx.node.runtime_properties['key'] = inputs['key'] + ctx.node.attributes['key'] = inputs['key'] ''' suffix = '.py' script_path = self._create_script( @@ -200,7 +200,7 @@ if __name__ == '__main__': executor, workflow_context, script_path=script_path, inputs={'key': 'value'}) - assert props['key'] == 'value' + assert props['key'].value == 'value' @pytest.mark.parametrize( 'value', ['string-value', [1, 2, 3], 999, 3.14, False, @@ -209,16 +209,17 @@ if __name__ == '__main__': script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties key "${input_as_env_var}" + ctx node attributes key "${input_as_env_var}" ''', windows_script=''' - ctx node runtime-properties key "%input_as_env_var%" + ctx node attributes key "%input_as_env_var%" ''') props = self._run( executor, workflow_context, script_path=script_path, env_var=value) - expected = props['key'] if isinstance(value, basestring) else json.loads(props['key']) + value = props['key'].value + expected = value if isinstance(value, basestring) else json.loads(value) assert expected == value @pytest.mark.parametrize('value', ['override', {'key': 'value'}]) @@ -227,10 +228,10 @@ if __name__ == '__main__': script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties key "${input_as_env_var}" + ctx node attributes key "${input_as_env_var}" ''', windows_script=''' - ctx node runtime-properties key "%input_as_env_var%" + ctx node attributes key "%input_as_env_var%" ''') props = self._run( @@ -242,17 +243,18 @@ if __name__ == '__main__': 'input_as_env_var': value } }) - expected = props['key'] if isinstance(value, basestring) else json.loads(props['key']) + value = props['key'].value + expected = value if isinstance(value, basestring) else json.loads(value) assert expected == value def test_get_nonexistent_runtime_property(self, executor, workflow_context, tmpdir): script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties nonexistent + ctx node attributes nonexistent ''', windows_script=''' - ctx node runtime-properties nonexistent + ctx node attributes nonexistent ''') exception = self._run_and_get_task_exception( executor, workflow_context, @@ -266,10 +268,10 @@ if __name__ == '__main__': script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx -j instance runtime-properties nonexistent + ctx -j instance attributes nonexistent ''', windows_script=''' - ctx -j instance runtime-properties nonexistent + ctx -j instance attributes nonexistent ''') exception = self._run_and_get_task_exception( executor, workflow_context, @@ -502,7 +504,7 @@ if __name__ == '__main__': tasks_graph=tasks_graph) eng.execute() return workflow_context.model.node.get_by_name( - mock.models.DEPENDENCY_NODE_NAME).runtime_properties + mock.models.DEPENDENCY_NODE_NAME).attributes @pytest.fixture def executor(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index a9dc5e8..92d250e 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -53,9 +53,9 @@ _FABRIC_ENV = { class TestWithActualSSHServer(object): def test_run_script_basic(self): - expected_runtime_property_value = 'some_value' - props = self._execute(env={'test_value': expected_runtime_property_value}) - assert props['test_value'] == expected_runtime_property_value + expected_attribute_value = 'some_value' + props = self._execute(env={'test_value': expected_attribute_value}) + assert props['test_value'].value == expected_attribute_value @pytest.mark.skip(reason='sudo privileges are required') def test_run_script_as_sudo(self): @@ -66,7 +66,7 @@ class TestWithActualSSHServer(object): def test_run_script_default_base_dir(self): props = self._execute() - assert props['work_dir'] == '{0}/work'.format(constants.DEFAULT_BASE_DIR) + assert props['work_dir'].value == '{0}/work'.format(constants.DEFAULT_BASE_DIR) @pytest.mark.skip(reason='Re-enable once output from process executor can be captured') @pytest.mark.parametrize('hide_groups', [[], ['everything']]) @@ -93,16 +93,16 @@ class TestWithActualSSHServer(object): 'cwd': expected_cwd, 'base_dir': expected_base_dir }) - assert props['env_value'] == expected_env_value - assert len(props['bash_version']) > 0 - assert props['arg1_value'] == expected_arg1_value - assert props['arg2_value'] == expected_arg2_value - assert props['cwd'] == expected_cwd - assert props['ctx_path'] == '{0}/ctx'.format(expected_base_dir) + assert props['env_value'].value == expected_env_value + assert len(props['bash_version'].value) > 0 + assert props['arg1_value'].value == expected_arg1_value + assert props['arg2_value'].value == expected_arg2_value + assert props['cwd'].value == expected_cwd + assert props['ctx_path'].value == '{0}/ctx'.format(expected_base_dir) def test_run_script_command_prefix(self): props = self._execute(process={'command_prefix': 'bash -i'}) - assert 'i' in props['dollar_dash'] + assert 'i' in props['dollar_dash'].value def test_run_script_reuse_existing_ctx(self): expected_test_value_1 = 'test_value_1' @@ -112,27 +112,27 @@ class TestWithActualSSHServer(object): '{0}_2'.format(self.test_name)], env={'test_value1': expected_test_value_1, 'test_value2': expected_test_value_2}) - assert props['test_value1'] == expected_test_value_1 - assert props['test_value2'] == expected_test_value_2 + assert props['test_value1'].value == expected_test_value_1 + assert props['test_value2'].value == expected_test_value_2 def test_run_script_download_resource_plain(self, tmpdir): resource = tmpdir.join('resource') resource.write('content') self._upload(str(resource), 'test_resource') props = self._execute() - assert props['test_value'] == 'content' + assert props['test_value'].value == 'content' def test_run_script_download_resource_and_render(self, tmpdir): resource = tmpdir.join('resource') resource.write('{{ctx.service.name}}') self._upload(str(resource), 'test_resource') props = self._execute() - assert props['test_value'] == self._workflow_context.service.name + assert props['test_value'].value == self._workflow_context.service.name @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}]) def test_run_script_inputs_as_env_variables_no_override(self, value): props = self._execute(custom_input=value) - return_value = props['test_value'] + return_value = props['test_value'].value expected = return_value if isinstance(value, basestring) else json.loads(return_value) assert value == expected @@ -140,7 +140,7 @@ class TestWithActualSSHServer(object): def test_run_script_inputs_as_env_variables_process_env_override(self, value): props = self._execute(custom_input='custom-input-value', env={'custom_env_var': value}) - return_value = props['test_value'] + return_value = props['test_value'].value expected = return_value if isinstance(value, basestring) else json.loads(return_value) assert value == expected @@ -260,7 +260,7 @@ class TestWithActualSSHServer(object): tasks_graph=tasks_graph) eng.execute() return self._workflow_context.model.node.get_by_name( - mock.models.DEPENDENCY_NODE_NAME).runtime_properties + mock.models.DEPENDENCY_NODE_NAME).attributes def _execute_and_get_task_exception(self, *args, **kwargs): signal = events.on_failure_task_signal http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py index 50ca7f5..e488933 100644 --- a/tests/orchestrator/workflows/core/test_task.py +++ b/tests/orchestrator/workflows/core/test_task.py @@ -100,7 +100,7 @@ class TestOperationTask(object): storage_task = ctx.model.task.get_by_name(core_task.name) assert storage_task.plugin is storage_plugin assert storage_task.execution_name == ctx.execution.name - assert storage_task.actor == core_task.context.node + assert storage_task.actor == core_task.context.node._original_model assert core_task.model_task == storage_task assert core_task.name == api_task.name assert core_task.implementation == api_task.implementation http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index 375c44e..41c4b2e 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -74,3 +74,7 @@ class MockContext(object): return cls(storage=aria.application_model_storage(**kwargs)) else: return cls() + + @staticmethod + def close(): + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py index 1dbfae1..92f0fc4 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -17,7 +17,6 @@ import time import fasteners import pytest -from aria.storage.exceptions import StorageError from aria.orchestrator import events from aria.orchestrator.workflows.exceptions import ExecutorException from aria.orchestrator.workflows import api @@ -29,47 +28,37 @@ from tests.orchestrator.context import execute as execute_workflow from tests.orchestrator.workflows.helpers import events_collector from tests import mock from tests import storage +from tests import helpers -def test_concurrent_modification_on_task_succeeded(context, executor, lock_files): - _test(context, executor, lock_files, _test_task_succeeded, expected_failure=True) - - -@operation -def _test_task_succeeded(ctx, lock_files, key, first_value, second_value): - _concurrent_update(lock_files, ctx.node, key, first_value, second_value) +@pytest.fixture +def dataholder(tmpdir): + dataholder_path = str(tmpdir.join('dataholder')) + holder = helpers.FilesystemDataHolder(dataholder_path) + return holder -def test_concurrent_modification_on_task_failed(context, executor, lock_files): - _test(context, executor, lock_files, _test_task_failed, expected_failure=True) +def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder): + _test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False) @operation -def _test_task_failed(ctx, lock_files, key, first_value, second_value): - first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value) - if not first: - raise RuntimeError('MESSAGE') +def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path): + _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path) -def test_concurrent_modification_on_update_and_refresh(context, executor, lock_files): - _test(context, executor, lock_files, _test_update_and_refresh, expected_failure=False) +def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder): + _test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True) @operation -def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value): - node = ctx.node - first = _concurrent_update(lock_files, node, key, first_value, second_value) +def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path): + first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path) if not first: - try: - ctx.model.node.update(node) - except StorageError as e: - assert 'Version conflict' in str(e) - ctx.model.node.refresh(node) - else: - raise RuntimeError('Unexpected') + raise RuntimeError('MESSAGE') -def _test(context, executor, lock_files, func, expected_failure): +def _test(context, executor, lock_files, func, dataholder, expected_failure): def _node(ctx): return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) @@ -82,7 +71,8 @@ def _test(context, executor, lock_files, func, expected_failure): 'lock_files': lock_files, 'key': key, 'first_value': first_value, - 'second_value': second_value + 'second_value': second_value, + 'holder_path': dataholder.path } node = _node(context) @@ -118,17 +108,13 @@ def _test(context, executor, lock_files, func, expected_failure): except ExecutorException: pass - props = _node(context).runtime_properties - assert props[key] == first_value + props = _node(context).attributes + assert dataholder['invocations'] == 2 + assert props[key].value == dataholder[key] exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])] if expected_failure: assert exceptions - exception = exceptions[-1] - assert isinstance(exception, StorageError) - assert 'Version conflict' in str(exception) - else: - assert not exceptions @pytest.fixture @@ -150,8 +136,8 @@ def lock_files(tmpdir): return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file')) -def _concurrent_update(lock_files, node, key, first_value, second_value): - +def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path): + holder = helpers.FilesystemDataHolder(holder_path) locker1 = fasteners.InterProcessLock(lock_files[0]) locker2 = fasteners.InterProcessLock(lock_files[1]) @@ -161,11 +147,14 @@ def _concurrent_update(lock_files, node, key, first_value, second_value): # Give chance for both processes to acquire locks while locker2.acquire(blocking=False): locker2.release() - time.sleep(0.01) + time.sleep(0.1) else: locker2.acquire() - node.runtime_properties[key] = first_value if first else second_value + node.attributes[key] = first_value if first else second_value + holder['key'] = first_value if first else second_value + holder.setdefault('invocations', 0) + holder['invocations'] += 1 if first: locker1.release() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index 878ac24..30b23ed 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -56,7 +56,7 @@ def test_decorate_extension(context, executor): graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) eng.execute() - out = get_node(context).runtime_properties['out'] + out = get_node(context).attributes.get('out').value assert out['wrapper_inputs'] == inputs assert out['function_inputs'] == inputs @@ -67,7 +67,7 @@ class MockProcessExecutorExtension(object): def decorate(self): def decorator(function): def wrapper(ctx, **operation_inputs): - ctx.node.runtime_properties['out'] = {'wrapper_inputs': operation_inputs} + ctx.node.attributes['out'] = {'wrapper_inputs': operation_inputs} function(ctx=ctx, **operation_inputs) return wrapper return decorator @@ -75,7 +75,7 @@ class MockProcessExecutorExtension(object): @operation def _mock_operation(ctx, **operation_inputs): - ctx.node.runtime_properties['out']['function_inputs'] = operation_inputs + ctx.node.attributes['out']['function_inputs'] = operation_inputs @pytest.fixture