Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-262-Inconsistent-node-attributes-behavior 1b86b08c6 -> 79a1f761f (forced update)
ARIA-262 Inconsistent node attributes behavior Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/79a1f761 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/79a1f761 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/79a1f761 Branch: refs/heads/ARIA-262-Inconsistent-node-attributes-behavior Commit: 79a1f761fb01c7b2d6ae1bb4b9adea7e61042426 Parents: b6d3c43 Author: max-orlov <ma...@gigaspaces.com> Authored: Wed May 31 21:07:49 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Tue Jun 6 14:27:11 2017 +0300 ---------------------------------------------------------------------- .../context/collection_instrumentation.py | 242 --------------- aria/orchestrator/context/operation.py | 21 +- aria/orchestrator/context/toolbelt.py | 6 +- aria/orchestrator/decorators.py | 6 +- .../execution_plugin/ctx_proxy/server.py | 17 +- aria/storage/api.py | 10 + aria/storage/collection_instrumentation.py | 306 +++++++++++++++++++ aria/storage/core.py | 16 + aria/storage/sql_mapi.py | 14 +- aria/utils/imports.py | 2 +- .../context/test_collection_instrumentation.py | 150 ++++++--- .../execution_plugin/test_ctx_proxy_server.py | 2 +- tests/orchestrator/execution_plugin/test_ssh.py | 10 +- tests/orchestrator/workflows/core/test_task.py | 2 +- .../executor/test_process_executor_extension.py | 3 +- 15 files changed, 491 insertions(+), 316 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/aria/orchestrator/context/collection_instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/collection_instrumentation.py b/aria/orchestrator/context/collection_instrumentation.py deleted file mode 100644 index 8f80d4a..0000000 --- a/aria/orchestrator/context/collection_instrumentation.py +++ /dev/null @@ -1,242 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from functools import partial - -from aria.modeling import models - - -class _InstrumentedCollection(object): - - def __init__(self, - model, - parent, - field_name, - seq=None, - is_top_level=True, - **kwargs): - self._model = model - self._parent = parent - self._field_name = field_name - self._is_top_level = is_top_level - self._load(seq, **kwargs) - - @property - def _raw(self): - raise NotImplementedError - - def _load(self, seq, **kwargs): - """ - Instantiates the object from existing seq. - - :param seq: the original sequence to load from - :return: - """ - raise NotImplementedError - - def _set(self, key, value): - """ - set the changes for the current object (not in the db) - - :param key: - :param value: - :return: - """ - raise NotImplementedError - - def _del(self, collection, key): - raise NotImplementedError - - def _instrument(self, key, value): - """ - Instruments any collection to track changes (and ease of access) - :param key: - :param value: - :return: - """ - if isinstance(value, _InstrumentedCollection): - return value - elif isinstance(value, dict): - instrumentation_cls = _InstrumentedDict - elif isinstance(value, list): - instrumentation_cls = _InstrumentedList - else: - return value - - return instrumentation_cls(self._model, self, key, value, False) - - @staticmethod - def _raw_value(value): - """ - Get the raw value. - :param value: - :return: - """ - if isinstance(value, models.Attribute): - return value.value - return value - - @staticmethod - def _encapsulate_value(key, value): - """ - Create a new item cls if needed. - :param key: - :param value: - :return: - """ - if isinstance(value, models.Attribute): - return value - # If it is not wrapped - return models.Attribute.wrap(key, value) - - def __setitem__(self, key, value): - """ - Update the values in both the local and the db locations. - :param key: - :param value: - :return: - """ - self._set(key, value) - if self._is_top_level: - # We are at the top level - field = getattr(self._parent, self._field_name) - mapi = getattr(self._model, models.Attribute.__modelname__) - value = self._set_field(field, - key, - value if key in field else self._encapsulate_value(key, value)) - mapi.update(value) - else: - # We are not at the top level - self._set_field(self._parent, self._field_name, self) - - def _set_field(self, collection, key, value): - """ - enables updating the current change in the ancestors - :param collection: the collection to change - :param key: the key for the specific field - :param value: the new value - :return: - """ - if isinstance(value, _InstrumentedCollection): - value = value._raw - if key in collection and isinstance(collection[key], models.Attribute): - if isinstance(collection[key], _InstrumentedCollection): - self._del(collection, key) - collection[key].value = value - else: - collection[key] = value - return collection[key] - - def __deepcopy__(self, *args, **kwargs): - return self._raw - - -class _InstrumentedDict(_InstrumentedCollection, dict): - - def _load(self, dict_=None, **kwargs): - dict.__init__( - self, - tuple((key, self._raw_value(value)) for key, value in (dict_ or {}).items()), - **kwargs) - - def update(self, dict_=None, **kwargs): - dict_ = dict_ or {} - for key, value in dict_.items(): - self[key] = value - for key, value in kwargs.items(): - self[key] = value - - def __getitem__(self, key): - return self._instrument(key, dict.__getitem__(self, key)) - - def _set(self, key, value): - dict.__setitem__(self, key, self._raw_value(value)) - - @property - def _raw(self): - return dict(self) - - def _del(self, collection, key): - del collection[key] - - -class _InstrumentedList(_InstrumentedCollection, list): - - def _load(self, list_=None, **kwargs): - list.__init__(self, list(item for item in list_ or [])) - - def append(self, value): - self.insert(len(self), value) - - def insert(self, index, value): - list.insert(self, index, self._raw_value(value)) - if self._is_top_level: - field = getattr(self._parent, self._field_name) - field.insert(index, self._encapsulate_value(index, value)) - else: - self._parent[self._field_name] = self - - def __getitem__(self, key): - return self._instrument(key, list.__getitem__(self, key)) - - def _set(self, key, value): - list.__setitem__(self, key, value) - - def _del(self, collection, key): - del collection[key] - - @property - def _raw(self): - return list(self) - - -class _InstrumentedModel(object): - - def __init__(self, field_name, original_model, model_storage): - super(_InstrumentedModel, self).__init__() - self._field_name = field_name - self._model_storage = model_storage - self._original_model = original_model - self._apply_instrumentation() - - def __getattr__(self, item): - return getattr(self._original_model, item) - - def _apply_instrumentation(self): - - field = getattr(self._original_model, self._field_name) - - # Preserve the original value. e.g. original attributes would be located under - # _attributes - setattr(self, '_{0}'.format(self._field_name), field) - - # set instrumented value - setattr(self, self._field_name, _InstrumentedDict(self._model_storage, - self._original_model, - self._field_name, - field)) - - -def instrument_collection(field_name, func=None): - if func is None: - return partial(instrument_collection, field_name) - - def _wrapper(*args, **kwargs): - original_model = func(*args, **kwargs) - return type('Instrumented{0}'.format(original_model.__class__.__name__), - (_InstrumentedModel, ), - {})(field_name, original_model, args[0].model) - - return _wrapper http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index f0ba337..af7220d 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -21,10 +21,7 @@ import threading import aria from aria.utils import file -from . import ( - common, - collection_instrumentation -) +from . import common class BaseOperationContext(common.BaseContext): @@ -32,6 +29,13 @@ class BaseOperationContext(common.BaseContext): Context object used during operation creation and execution """ + INSTRUMENTATION_FIELDS = ( + aria.modeling.models.Node.attributes, + aria.modeling.models.Node.properties, + aria.modeling.models.NodeTemplate.attributes, + aria.modeling.models.NodeTemplate.properties + ) + def __init__(self, task_id, actor_id, **kwargs): self._task_id = task_id self._actor_id = actor_id @@ -76,7 +80,6 @@ class BaseOperationContext(common.BaseContext): @property def serialization_dict(self): - context_cls = self.__class__ context_dict = { 'name': self.name, 'service_id': self._service_id, @@ -89,7 +92,7 @@ class BaseOperationContext(common.BaseContext): 'logger_level': self.logger.level } return { - 'context_cls': context_cls, + 'context_cls': self.__class__, 'context': context_dict } @@ -117,7 +120,6 @@ class NodeOperationContext(BaseOperationContext): """ @property - @collection_instrumentation.instrument_collection('attributes') def node_template(self): """ the node of the current operation @@ -126,7 +128,6 @@ class NodeOperationContext(BaseOperationContext): return self.node.node_template @property - @collection_instrumentation.instrument_collection('attributes') def node(self): """ The node instance of the current operation @@ -141,7 +142,6 @@ class RelationshipOperationContext(BaseOperationContext): """ @property - @collection_instrumentation.instrument_collection('attributes') def source_node_template(self): """ The source node @@ -150,7 +150,6 @@ class RelationshipOperationContext(BaseOperationContext): return self.source_node.node_template @property - @collection_instrumentation.instrument_collection('attributes') def source_node(self): """ The source node instance @@ -159,7 +158,6 @@ class RelationshipOperationContext(BaseOperationContext): return self.relationship.source_node @property - @collection_instrumentation.instrument_collection('attributes') def target_node_template(self): """ The target node @@ -168,7 +166,6 @@ class RelationshipOperationContext(BaseOperationContext): return self.target_node.node_template @property - @collection_instrumentation.instrument_collection('attributes') def target_node(self): """ The target node instance http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/aria/orchestrator/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py index 5788ee7..b5a54a9 100644 --- a/aria/orchestrator/context/toolbelt.py +++ b/aria/orchestrator/context/toolbelt.py @@ -33,11 +33,7 @@ class NodeToolBelt(object): :return: """ assert isinstance(self._op_context, operation.NodeOperationContext) - host = self._op_context.node.host - ip = host.attributes.get('ip') - if ip: - return ip.value - + return self._op_context.node.host.attributes.get('ip') class RelationshipToolBelt(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/aria/orchestrator/decorators.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py index 4051a54..80f6962 100644 --- a/aria/orchestrator/decorators.py +++ b/aria/orchestrator/decorators.py @@ -68,11 +68,13 @@ def operation(func=None, toolbelt=False, suffix_template='', logging_handlers=No @wraps(func) def _wrapper(**func_kwargs): + ctx = func_kwargs['ctx'] if toolbelt: - operation_toolbelt = context.toolbelt(func_kwargs['ctx']) + operation_toolbelt = context.toolbelt(ctx) func_kwargs.setdefault('toolbelt', operation_toolbelt) validate_function_arguments(func, func_kwargs) - return func(**func_kwargs) + with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS): + return func(**func_kwargs) return _wrapper http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index 102ff9a..50d4c3a 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -117,14 +117,15 @@ class CtxProxy(object): def _process(self, request): try: - typed_request = json.loads(request) - args = typed_request['args'] - payload = _process_ctx_request(self.ctx, args) - result_type = 'result' - if isinstance(payload, exceptions.ScriptException): - payload = dict(message=str(payload)) - result_type = 'stop_operation' - result = {'type': result_type, 'payload': payload} + with self.ctx.model.instrument(*self.ctx.INSTRUMENTATION_FIELDS): + typed_request = json.loads(request) + args = typed_request['args'] + payload = _process_ctx_request(self.ctx, args) + result_type = 'result' + if isinstance(payload, exceptions.ScriptException): + payload = dict(message=str(payload)) + result_type = 'stop_operation' + result = {'type': result_type, 'payload': payload} except Exception as e: traceback_out = StringIO.StringIO() traceback.print_exc(file=traceback_out) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/aria/storage/api.py ---------------------------------------------------------------------- diff --git a/aria/storage/api.py b/aria/storage/api.py index ed8a2ff..3304721 100644 --- a/aria/storage/api.py +++ b/aria/storage/api.py @@ -15,6 +15,7 @@ """ General storage API """ +import threading class StorageAPI(object): @@ -45,6 +46,15 @@ class ModelAPI(StorageAPI): super(ModelAPI, self).__init__(**kwargs) self._model_cls = model_cls self._name = name or model_cls.__modelname__ + self._thread_local = threading.local() + self._thread_local._instrumentation = [] + + @property + def _instrumentation(self): + if not hasattr(self._thread_local, '_instrumentation'): + self._thread_local._instrumentation = [] + return self._thread_local._instrumentation + @property def name(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/aria/storage/collection_instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/collection_instrumentation.py b/aria/storage/collection_instrumentation.py new file mode 100644 index 0000000..27d8322 --- /dev/null +++ b/aria/storage/collection_instrumentation.py @@ -0,0 +1,306 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import exceptions + + +class _InstrumentedCollection(object): + + def __init__(self, + mapi, + parent, + field_name, + field_cls, + seq=None, + is_top_level=True, + **kwargs): + self._mapi = mapi + self._parent = parent + self._field_name = field_name + self._is_top_level = is_top_level + self._field_cls = field_cls + self._load(seq, **kwargs) + + @property + def _raw(self): + raise NotImplementedError + + def _load(self, seq, **kwargs): + """ + Instantiates the object from existing seq. + + :param seq: the original sequence to load from + :return: + """ + raise NotImplementedError + + def _set(self, key, value): + """ + set the changes for the current object (not in the db) + + :param key: + :param value: + :return: + """ + raise NotImplementedError + + def _del(self, collection, key): + raise NotImplementedError + + def _instrument(self, key, value): + """ + Instruments any collection to track changes (and ease of access) + :param key: + :param value: + :return: + """ + if isinstance(value, _InstrumentedCollection): + return value + elif isinstance(value, dict): + instrumentation_cls = _InstrumentedDict + elif isinstance(value, list): + instrumentation_cls = _InstrumentedList + else: + return value + + return instrumentation_cls(self._mapi, self, key, self._field_cls, value, False) + + def _raw_value(self, value): + """ + Get the raw value. + :param value: + :return: + """ + if isinstance(value, self._field_cls): + return value.value + return value + + def _encapsulate_value(self, key, value): + """ + Create a new item cls if needed. + :param key: + :param value: + :return: + """ + if isinstance(value, self._field_cls): + return value + # If it is not wrapped + return self._field_cls.wrap(key, value) + + def __setitem__(self, key, value): + """ + Update the values in both the local and the db locations. + :param key: + :param value: + :return: + """ + self._set(key, value) + if self._is_top_level: + # We are at the top level + field = getattr(self._parent, self._field_name) + self._set_field( + field, key, value if key in field else self._encapsulate_value(key, value)) + self._mapi.update(self._parent) + else: + # We are not at the top level + self._set_field(self._parent, self._field_name, self) + + def _set_field(self, collection, key, value): + """ + enables updating the current change in the ancestors + :param collection: the collection to change + :param key: the key for the specific field + :param value: the new value + :return: + """ + if isinstance(value, _InstrumentedCollection): + value = value._raw + if key in collection and isinstance(collection[key], self._field_cls): + if isinstance(collection[key], _InstrumentedCollection): + self._del(collection, key) + collection[key].value = value + else: + collection[key] = value + return collection[key] + + def __deepcopy__(self, *args, **kwargs): + return self._raw + + +class _InstrumentedDict(_InstrumentedCollection, dict): + + def _load(self, dict_=None, **kwargs): + dict.__init__( + self, + tuple((key, self._raw_value(value)) for key, value in (dict_ or {}).items()), + **kwargs) + + def update(self, dict_=None, **kwargs): + dict_ = dict_ or {} + for key, value in dict_.items(): + self[key] = value + for key, value in kwargs.items(): + self[key] = value + + def __getitem__(self, key): + return self._instrument(key, dict.__getitem__(self, key)) + + def _set(self, key, value): + dict.__setitem__(self, key, self._raw_value(value)) + + @property + def _raw(self): + return dict(self) + + def _del(self, collection, key): + del collection[key] + + +class _InstrumentedList(_InstrumentedCollection, list): + + def _load(self, list_=None, **kwargs): + list.__init__(self, list(item for item in list_ or [])) + + def append(self, value): + self.insert(len(self), value) + + def insert(self, index, value): + list.insert(self, index, self._raw_value(value)) + if self._is_top_level: + field = getattr(self._parent, self._field_name) + field.insert(index, self._encapsulate_value(index, value)) + else: + self._parent[self._field_name] = self + + def __getitem__(self, key): + return self._instrument(key, list.__getitem__(self, key)) + + def _set(self, key, value): + list.__setitem__(self, key, value) + + def _del(self, collection, key): + del collection[key] + + @property + def _raw(self): + return list(self) + + +class _InstrumentedModel(object): + + def __init__(self, original_model, mapi, instrumentation): + """ + The original model + :param original_model: the model to be instrumented + :param mapi: the mapi for that model + """ + super(_InstrumentedModel, self).__init__() + self._original_model = original_model + self._mapi = mapi + self._instrumentation = instrumentation + self._apply_instrumentation() + + def __getattr__(self, item): + return_value = getattr(self._original_model, item) + if isinstance(return_value, self._original_model.__class__): + return _create_instrumented_model(return_value, self._mapi, self._instrumentation) + if isinstance(return_value, (list, dict)): + return _create_wrapped_model(return_value, self._mapi, self._instrumentation) + return return_value + + def _apply_instrumentation(self): + for field in self._instrumentation: + field_name = field.key + field_cls = field.mapper.class_ + field = getattr(self._original_model, field_name) + + # Preserve the original value. e.g. original attributes would be located under + # _attributes + setattr(self, '_{0}'.format(field_name), field) + + # set instrumented value + if isinstance(field, dict): + instrumentation_cls = _InstrumentedDict + elif isinstance(field, list): + instrumentation_cls = _InstrumentedList + else: + # TODO: raise proper error + raise exceptions.StorageError( + "ARIA supports instrumentation for dict and list. Field {field} of the " + "class {model} is of {type} type.".format( + field=field, + model=self._original_model, + type=type(field))) + + instrumented_class = instrumentation_cls(seq=field, + parent=self._original_model, + mapi=self._mapi, + field_name=field_name, + field_cls=field_cls) + setattr(self, field_name, instrumented_class) + + +class _WrappedModel(object): + + def __init__(self, wrapped, instrumentation, **kwargs): + """ + + :param instrumented_cls: The class to be instrumented + :param instrumentation_cls: the instrumentation cls + :param wrapped: the currently wrapped instance + :param kwargs: and kwargs to the passed to the instrumented class. + """ + self._kwargs = kwargs + self._instrumentation = instrumentation + self._wrapped = wrapped + + def _wrap(self, value): + if value.__class__ in (class_.class_ for class_ in self._instrumentation): + return _create_instrumented_model( + value, instrumentation=self._instrumentation, **self._kwargs) + elif hasattr(value, 'metadata') or isinstance(value, (dict, list)): + # Basically checks that the value is indeed an sqlmodel (it should have metadata) + return _create_wrapped_model( + value, instrumentation=self._instrumentation, **self._kwargs) + return value + + def __getattr__(self, item): + if hasattr(self, '_wrapped'): + return self._wrap(getattr(self._wrapped, item)) + else: + super(_WrappedModel, self).__getattribute__(item) + + def __getitem__(self, item): + return self._wrap(self._wrapped[item]) + + +def _create_instrumented_model(original_model, mapi, instrumentation, **kwargs): + return type('Instrumented{0}'.format(original_model.__class__.__name__), + (_InstrumentedModel,), + {})(original_model, mapi, instrumentation, **kwargs) + + +def _create_wrapped_model(original_model, mapi, instrumentation, **kwargs): + return type('Wrapped{0}'.format(original_model.__class__.__name__), + (_WrappedModel, ), + {})(original_model, instrumentation, mapi=mapi, **kwargs) + + +def instrument(instrumentation, original_model, mapi): + for instrumented_field in instrumentation: + if isinstance(original_model, instrumented_field.class_): + return _create_instrumented_model(original_model, mapi, instrumentation) + + return _create_wrapped_model(original_model, mapi, instrumentation) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/aria/storage/core.py ---------------------------------------------------------------------- diff --git a/aria/storage/core.py b/aria/storage/core.py index 8302fc9..f8bac51 100644 --- a/aria/storage/core.py +++ b/aria/storage/core.py @@ -37,6 +37,8 @@ API: * drivers - module, a pool of ARIA standard drivers. * StorageDriver - class, abstract model implementation. """ +import copy +from contextlib import contextmanager from aria.logger import LoggerMixin from . import sql_mapi @@ -165,3 +167,17 @@ class ModelStorage(Storage): """ for mapi in self.registered.values(): mapi.drop() + + @contextmanager + def instrument(self, *instrumentation): + original_instrumentation = {} + + try: + for mapi in self.registered.values(): + original_instrumentation[mapi] = copy.copy(mapi._instrumentation) + mapi._instrumentation.extend(instrumentation) + yield self + finally: + for mapi in self.registered.values(): + mapi._instrumentation[:] = original_instrumentation[mapi] + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/aria/storage/sql_mapi.py ---------------------------------------------------------------------- diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py index 730d007..4d7e233 100644 --- a/aria/storage/sql_mapi.py +++ b/aria/storage/sql_mapi.py @@ -29,6 +29,7 @@ from aria.utils.collections import OrderedDict from . import ( api, exceptions, + collection_instrumentation ) _predicates = {'ge': '__ge__', @@ -63,7 +64,7 @@ class SQLAlchemyModelAPI(api.ModelAPI): 'Requested `{0}` with ID `{1}` was not found' .format(self.model_cls.__name__, entry_id) ) - return result + return self._instrument(result) def get_by_name(self, entry_name, include=None, **kwargs): assert hasattr(self.model_cls, 'name') @@ -93,7 +94,7 @@ class SQLAlchemyModelAPI(api.ModelAPI): return ListResult( dict(total=total, size=size, offset=offset), - results + [self._instrument(result) for result in results] ) def iter(self, @@ -103,7 +104,8 @@ class SQLAlchemyModelAPI(api.ModelAPI): **kwargs): """Return a (possibly empty) list of `model_class` results """ - return iter(self._get_query(include, filters, sort)) + for result in self._get_query(include, filters, sort): + yield self._instrument(result) def put(self, entry, **kwargs): """Create a `model_class` instance from a serializable `model` object @@ -378,6 +380,12 @@ class SQLAlchemyModelAPI(api.ModelAPI): for rel in instance.__mapper__.relationships: getattr(instance, rel.key) + def _instrument(self, model): + if self._instrumentation: + return collection_instrumentation.instrument(self._instrumentation, model, self) + else: + return model + def init_storage(base_dir, filename='db.sqlite'): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/aria/utils/imports.py ---------------------------------------------------------------------- diff --git a/aria/utils/imports.py b/aria/utils/imports.py index 64a48cf..35aa0fc 100644 --- a/aria/utils/imports.py +++ b/aria/utils/imports.py @@ -17,8 +17,8 @@ Utility methods for dynamically loading python code """ -import importlib import pkgutil +import importlib def import_fullname(name, paths=None): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/tests/orchestrator/context/test_collection_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_collection_instrumentation.py b/tests/orchestrator/context/test_collection_instrumentation.py index 1e6214a..ae3e8ac 100644 --- a/tests/orchestrator/context/test_collection_instrumentation.py +++ b/tests/orchestrator/context/test_collection_instrumentation.py @@ -15,8 +15,14 @@ import pytest -from aria.modeling.models import Attribute -from aria.orchestrator.context import collection_instrumentation +from aria.modeling import models +from aria.storage import collection_instrumentation +from aria.orchestrator.context import operation + +from tests import ( + mock, + storage +) class MockActor(object): @@ -25,12 +31,16 @@ class MockActor(object): self.list_ = [] -class MockModel(object): +class MockMAPI(object): def __init__(self): - self.attribute = type('MockModel', (object, ), {'model_cls': Attribute, - 'put': lambda *args, **kwargs: None, - 'update': lambda *args, **kwargs: None})() + pass + + def put(self, *args, **kwargs): + pass + + def update(self, *args, **kwargs): + pass class CollectionInstrumentation(object): @@ -41,15 +51,15 @@ class CollectionInstrumentation(object): @pytest.fixture def model(self): - return MockModel() + return MockMAPI() @pytest.fixture def dict_(self, actor, model): - return collection_instrumentation._InstrumentedDict(model, actor, 'dict_') + return collection_instrumentation._InstrumentedDict(model, actor, 'dict_', models.Attribute) @pytest.fixture def list_(self, actor, model): - return collection_instrumentation._InstrumentedList(model, actor, 'list_') + return collection_instrumentation._InstrumentedList(model, actor, 'list_', models.Attribute) class TestDict(CollectionInstrumentation): @@ -57,16 +67,16 @@ class TestDict(CollectionInstrumentation): def test_keys(self, actor, dict_): dict_.update( { - 'key1': Attribute.wrap('key1', 'value1'), - 'key2': Attribute.wrap('key2', 'value2') + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key2', 'value2') } ) assert sorted(dict_.keys()) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys()) def test_values(self, actor, dict_): dict_.update({ - 'key1': Attribute.wrap('key1', 'value1'), - 'key2': Attribute.wrap('key1', 'value2') + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') }) assert (sorted(dict_.values()) == sorted(['value1', 'value2']) == @@ -74,34 +84,34 @@ class TestDict(CollectionInstrumentation): def test_items(self, dict_): dict_.update({ - 'key1': Attribute.wrap('key1', 'value1'), - 'key2': Attribute.wrap('key1', 'value2') + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') }) assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')]) def test_iter(self, actor, dict_): dict_.update({ - 'key1': Attribute.wrap('key1', 'value1'), - 'key2': Attribute.wrap('key1', 'value2') + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') }) assert sorted(list(dict_)) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys()) def test_bool(self, dict_): assert not dict_ dict_.update({ - 'key1': Attribute.wrap('key1', 'value1'), - 'key2': Attribute.wrap('key1', 'value2') + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') }) assert dict_ def test_set_item(self, actor, dict_): - dict_['key1'] = Attribute.wrap('key1', 'value1') + dict_['key1'] = models.Attribute.wrap('key1', 'value1') assert dict_['key1'] == 'value1' == actor.dict_['key1'].value - assert isinstance(actor.dict_['key1'], Attribute) + assert isinstance(actor.dict_['key1'], models.Attribute) def test_nested(self, actor, dict_): dict_['key'] = {} - assert isinstance(actor.dict_['key'], Attribute) + assert isinstance(actor.dict_['key'], models.Attribute) assert dict_['key'] == actor.dict_['key'].value == {} dict_['key']['inner_key'] = 'value' @@ -112,7 +122,7 @@ class TestDict(CollectionInstrumentation): assert dict_['key'].keys() == ['inner_key'] assert dict_['key'].values() == ['value'] assert dict_['key'].items() == [('inner_key', 'value')] - assert isinstance(actor.dict_['key'], Attribute) + assert isinstance(actor.dict_['key'], models.Attribute) assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict) dict_['key'].update({'updated_key': 'updated_value'}) @@ -123,7 +133,7 @@ class TestDict(CollectionInstrumentation): assert sorted(dict_['key'].values()) == sorted(['value', 'updated_value']) assert sorted(dict_['key'].items()) == sorted([('inner_key', 'value'), ('updated_key', 'updated_value')]) - assert isinstance(actor.dict_['key'], Attribute) + assert isinstance(actor.dict_['key'], models.Attribute) assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict) dict_.update({'key': 'override_value'}) @@ -131,12 +141,12 @@ class TestDict(CollectionInstrumentation): assert 'key' in dict_ assert dict_['key'] == 'override_value' assert len(actor.dict_) == 1 - assert isinstance(actor.dict_['key'], Attribute) + assert isinstance(actor.dict_['key'], models.Attribute) assert actor.dict_['key'].value == 'override_value' def test_get_item(self, actor, dict_): - dict_['key1'] = Attribute.wrap('key1', 'value1') - assert isinstance(actor.dict_['key1'], Attribute) + dict_['key1'] = models.Attribute.wrap('key1', 'value1') + assert isinstance(actor.dict_['key1'], models.Attribute) def test_update(self, actor, dict_): dict_['key1'] = 'value1' @@ -145,7 +155,7 @@ class TestDict(CollectionInstrumentation): dict_.update(new_dict) assert len(dict_) == 2 assert dict_['key2'] == 'value2' - assert isinstance(actor.dict_['key2'], Attribute) + assert isinstance(actor.dict_['key2'], models.Attribute) new_dict = {} new_dict.update(dict_) @@ -172,20 +182,20 @@ class TestDict(CollectionInstrumentation): class TestList(CollectionInstrumentation): def test_append(self, actor, list_): - list_.append(Attribute.wrap('name', 'value1')) + list_.append(models.Attribute.wrap('name', 'value1')) list_.append('value2') assert len(actor.list_) == 2 assert len(list_) == 2 - assert isinstance(actor.list_[0], Attribute) + assert isinstance(actor.list_[0], models.Attribute) assert list_[0] == 'value1' - assert isinstance(actor.list_[1], Attribute) + assert isinstance(actor.list_[1], models.Attribute) assert list_[1] == 'value2' list_[0] = 'new_value1' list_[1] = 'new_value2' - assert isinstance(actor.list_[1], Attribute) - assert isinstance(actor.list_[1], Attribute) + assert isinstance(actor.list_[1], models.Attribute) + assert isinstance(actor.list_[1], models.Attribute) assert list_[0] == 'new_value1' assert list_[1] == 'new_value2' @@ -214,12 +224,12 @@ class TestList(CollectionInstrumentation): list_.append([]) list_[0].append('inner_item') - assert isinstance(actor.list_[0], Attribute) + assert isinstance(actor.list_[0], models.Attribute) assert len(list_) == 1 assert list_[0][0] == 'inner_item' list_[0].append('new_item') - assert isinstance(actor.list_[0], Attribute) + assert isinstance(actor.list_[0], models.Attribute) assert len(list_) == 1 assert list_[0][1] == 'new_item' @@ -231,23 +241,85 @@ class TestDictList(CollectionInstrumentation): def test_dict_in_list(self, actor, list_): list_.append({}) assert len(list_) == 1 - assert isinstance(actor.list_[0], Attribute) + assert isinstance(actor.list_[0], models.Attribute) assert actor.list_[0].value == {} list_[0]['key'] = 'value' assert list_[0]['key'] == 'value' assert len(actor.list_) == 1 - assert isinstance(actor.list_[0], Attribute) + assert isinstance(actor.list_[0], models.Attribute) assert actor.list_[0].value['key'] == 'value' def test_list_in_dict(self, actor, dict_): dict_['key'] = [] assert len(dict_) == 1 - assert isinstance(actor.dict_['key'], Attribute) + assert isinstance(actor.dict_['key'], models.Attribute) assert actor.dict_['key'].value == [] dict_['key'].append('value') assert dict_['key'][0] == 'value' assert len(actor.dict_) == 1 - assert isinstance(actor.dict_['key'], Attribute) + assert isinstance(actor.dict_['key'], models.Attribute) assert actor.dict_['key'].value[0] == 'value' + + +class TestModelInstrumentation(object): + + @pytest.fixture + def workflow_ctx(self, tmpdir): + context = mock.context.simple(str(tmpdir), inmemory=True) + yield context + storage.release_sqlite_storage(context.model) + + def test_attributes_access(self, workflow_ctx): + node = workflow_ctx.model.node.list()[0] + task = models.Task(node=node) + workflow_ctx.model.task.put(task) + + ctx = operation.NodeOperationContext( + task.id, node.id, name='', service_id=workflow_ctx.model.service.list()[0].id, + model_storage=workflow_ctx.model, resource_storage=workflow_ctx.resource, + execution_id=1) + + def _run_assertions(is_under_ctx): + def ctx_assert(expr): + if is_under_ctx: + assert expr + else: + assert not expr + + ctx_assert(isinstance(ctx.node.attributes, + collection_instrumentation._InstrumentedDict)) + assert not isinstance(ctx.node.properties, + collection_instrumentation._InstrumentedCollection) + + for rel in ctx.node.inbound_relationships: + ctx_assert(isinstance(rel, collection_instrumentation._WrappedModel)) + ctx_assert(isinstance(rel.source_node.attributes, + collection_instrumentation._InstrumentedDict)) + ctx_assert(isinstance(rel.target_node.attributes, + collection_instrumentation._InstrumentedDict)) + + for node in ctx.model.node: + ctx_assert(isinstance(node.attributes, + collection_instrumentation._InstrumentedDict)) + assert not isinstance(node.properties, + collection_instrumentation._InstrumentedCollection) + + for rel in ctx.model.relationship: + ctx_assert(isinstance(rel, collection_instrumentation._WrappedModel)) + + ctx_assert(isinstance(rel.source_node.attributes, + collection_instrumentation._InstrumentedDict)) + ctx_assert(isinstance(rel.target_node.attributes, + collection_instrumentation._InstrumentedDict)) + + assert not isinstance(rel.source_node.properties, + collection_instrumentation._InstrumentedCollection) + assert not isinstance(rel.target_node.properties, + collection_instrumentation._InstrumentedCollection) + + with ctx.model.instrument(models.Node.attributes): + _run_assertions(True) + + _run_assertions(False) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py index 1b19fd9..7ab1bdb 100644 --- a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py +++ b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py @@ -138,7 +138,7 @@ class TestCtxProxy(object): @pytest.fixture def ctx(self, mocker): class MockCtx(object): - pass + INSTRUMENTATION_FIELDS = () ctx = MockCtx() properties = { 'prop1': 'value1', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index 899a007..8b326e7 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -422,15 +422,24 @@ class TestFabricEnvHideGroupsAndRunCommands(object): raise RuntimeError class _Ctx(object): + INSTRUMENTATION_FIELDS = () + class Task(object): @staticmethod def abort(message=None): models.Task.abort(message) actor = None + class Actor(object): host = None + + class Model(object): + @contextlib.contextmanager + def instrument(self, *args, **kwargs): + yield task = Task task.actor = Actor + model = Model() logger = logging.getLogger() @staticmethod @@ -439,7 +448,6 @@ class TestFabricEnvHideGroupsAndRunCommands(object): yield _Ctx.logging_handlers = _mock_self_logging - @pytest.fixture(autouse=True) def _setup(self, mocker): self.default_fabric_env = { http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py index a717e19..c0d3616 100644 --- a/tests/orchestrator/workflows/core/test_task.py +++ b/tests/orchestrator/workflows/core/test_task.py @@ -100,7 +100,7 @@ class TestOperationTask(object): storage_task = ctx.model.task.get_by_name(core_task.name) assert storage_task.plugin is storage_plugin assert storage_task.execution_name == ctx.execution.name - assert storage_task.actor == core_task.context.node._original_model + assert storage_task.actor == core_task.context.node assert core_task.model_task == storage_task assert core_task.name == api_task.name assert core_task.function == api_task.function http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79a1f761/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index e4944df..7969457 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -66,7 +66,8 @@ class MockProcessExecutorExtension(object): def decorate(self): def decorator(function): def wrapper(ctx, **operation_arguments): - ctx.node.attributes['out'] = {'wrapper_arguments': operation_arguments} + with ctx.model.instrument(ctx.model.node.model_cls.attributes): + ctx.node.attributes['out'] = {'wrapper_arguments': operation_arguments} function(ctx=ctx, **operation_arguments) return wrapper return decorator