Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-42-Generic-ctx-serialization-mechanism 6da1ba132 -> 77165a4fb
some rework Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/77165a4f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/77165a4f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/77165a4f Branch: refs/heads/ARIA-42-Generic-ctx-serialization-mechanism Commit: 77165a4fbc8fbb2e7612bbe816b45d764a07ab30 Parents: 6da1ba1 Author: mxmrlv <[email protected]> Authored: Wed Feb 8 12:26:44 2017 +0200 Committer: mxmrlv <[email protected]> Committed: Wed Feb 8 12:26:44 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/context/operation.py | 27 ++++++++ aria/orchestrator/context/serialization.py | 73 -------------------- aria/orchestrator/runner.py | 19 ++--- aria/orchestrator/workflows/executor/process.py | 5 +- aria/storage/core.py | 8 +++ .../workflows/executor/test_executor.py | 5 +- .../workflows/executor/test_process_executor.py | 17 +++-- 7 files changed, 57 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 23a6fd4..d1f61b2 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -17,6 +17,7 @@ Workflow and operation contexts """ +import aria from aria.utils import file from .common import BaseContext @@ -73,6 +74,32 @@ class BaseOperationContext(BaseContext): file.makedirs(plugin_workdir) return plugin_workdir + @property + def serialization_dict(self): + context_cls = self.__class__ + context_dict = { + 'name': self.name, + 'deployment_id': self._deployment_id, + 'task_id': self._task_id, + 'actor_id': self._actor_id, + 'workdir': self._workdir, + 'model_storage': self.model.serialization_dict if self.model else None, + 'resource_storage': self.resource.serialization_dict if self.resource else None + } + return { + 'context_cls': context_cls, + 'context': context_dict + } + + @classmethod + def deserialize_from_dict(cls, model_storage=None, resource_storage=None, **kwargs): + if model_storage: + model_storage = aria.application_model_storage(**model_storage) + if resource_storage: + resource_storage = aria.application_resource_storage(**resource_storage) + + return cls(model_storage=model_storage, resource_storage=resource_storage, **kwargs) + class NodeOperationContext(BaseOperationContext): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/aria/orchestrator/context/serialization.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/serialization.py b/aria/orchestrator/context/serialization.py deleted file mode 100644 index 705a63b..0000000 --- a/aria/orchestrator/context/serialization.py +++ /dev/null @@ -1,73 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import aria - - -def operation_context_to_dict(context): - context_cls = context.__class__ - context_dict = { - 'name': context.name, - 'deployment_id': context._deployment_id, - 'task_id': context._task_id, - 'actor_id': context._actor_id, - 'workdir': context._workdir - } - if context.model: - model = context.model - context_dict['model_storage'] = { - 'api_cls': model.api, - 'initiator': model._initiator, - 'initiator_kwargs': model._initiator_kwargs, - } - - else: - context_dict['model_storage'] = None - if context.resource: - resource = context.resource - context_dict['resource_storage'] = { - 'api_cls': resource.api, - 'initiator': resource._initiator, - 'initiator_kwargs': resource._initiator_kwargs, - } - else: - context_dict['resource_storage'] = None - return { - 'context_cls': context_cls, - 'context': context_dict - } - - -def operation_context_from_dict(context_dict): - context_cls = context_dict['context_cls'] - context = context_dict['context'] - - model_storage = context['model_storage'] - if model_storage: - api_cls = model_storage['api_cls'] - initiator_kwargs = model_storage['initiator_kwargs'] - init_func = model_storage['initiator'] - context['model_storage'] = aria.application_model_storage( - api_cls, initiator_kwargs=initiator_kwargs, initiator=init_func) - - resource_storage = context['resource_storage'] - if resource_storage: - api_cls = resource_storage['api_cls'] - initiator_kwargs = resource_storage['initiator_kwargs'] - init_func = resource_storage['initiator'] - context['resource_storage'] = aria.application_resource_storage( - api=api_cls, initiator_kwargs=initiator_kwargs, initiator=init_func) - - return context_cls(**context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/aria/orchestrator/runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py index 6f998be..97fe410 100644 --- a/aria/orchestrator/runner.py +++ b/aria/orchestrator/runner.py @@ -17,22 +17,20 @@ Workflow runner """ -import platform import tempfile import os -from sqlalchemy import (create_engine, orm) # @UnresolvedImport -from sqlalchemy.pool import StaticPool # @UnresolvedImport - from .context.workflow import WorkflowContext from .workflows.core.engine import Engine from .workflows.executor.thread import ThreadExecutor from ..storage import ( - model, sql_mapi, filesystem_rapi, ) -from .. import (application_model_storage, application_resource_storage) +from .. import ( + application_model_storage, + application_resource_storage +) SQLITE_IN_MEMORY = 'sqlite:///:memory:' @@ -98,12 +96,7 @@ class Runner(object): task_retry_interval=1) - @staticmethod - def create_fs_resource_storage(directory='.'): - return - def cleanup(self): - if (self._is_storage_temporary and - (self._storage_path is not None) and - os.path.isfile(self._storage_path)): + if (self._is_storage_temporary and (self._storage_path is not None) and + os.path.isfile(self._storage_path)): os.remove(self._storage_path) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 23a4f52..c4b8ba1 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -46,7 +46,6 @@ from aria.extension import process_executor from aria.utils import imports from aria.utils import exceptions from aria.orchestrator.workflows.executor import base -from aria.orchestrator.context import serialization from aria.storage import instrumentation from aria.storage import type as storage_type @@ -194,7 +193,7 @@ class ProcessExecutor(base.BaseExecutor): 'operation_mapping': task.operation_mapping, 'operation_inputs': task.inputs, 'port': self._server_port, - 'context': serialization.operation_context_to_dict(task.context), + 'context': task.context.serialization_dict, } def _update_env(self, env, plugin_prefix): @@ -328,7 +327,7 @@ def _main(): with instrumentation.track_changes() as instrument: try: - ctx = serialization.operation_context_from_dict(context_dict) + ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) _patch_session(ctx=ctx, messenger=messenger, instrument=instrument) task_func = imports.load_attribute(operation_mapping) aria.install_aria_extensions() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/aria/storage/core.py ---------------------------------------------------------------------- diff --git a/aria/storage/core.py b/aria/storage/core.py index c45f2f2..4f7cd02 100644 --- a/aria/storage/core.py +++ b/aria/storage/core.py @@ -84,6 +84,14 @@ class Storage(LoggerMixin): except KeyError: return super(Storage, self).__getattribute__(item) + @property + def serialization_dict(self): + return { + 'api': self.api, + 'initiator': self._initiator, + 'initiator_kwargs': self._initiator_kwargs + } + def register(self, entry): """ Register the entry to the storage http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 2486a1e..0b33ae6 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -30,6 +30,7 @@ except ImportError: from aria.storage import model from aria.orchestrator import events +from aria.orchestrator.context import operation from aria.orchestrator.workflows.executor import ( thread, process, @@ -80,9 +81,9 @@ class MockException(Exception): pass -class MockContext(object): +class MockContext(operation.BaseOperationContext): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called pass def __getattr__(self, item): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/77165a4f/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 4972e61..9f0d1db 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -22,11 +22,16 @@ from contextlib import contextmanager import pytest from aria import application_model_storage -from aria.storage import model as aria_model +from aria.storage import ( + model as aria_model, + sql_mapi +) +from aria.orchestrator import ( + events, + plugin +) +from aria.orchestrator.context import operation from aria.utils.plugin import create as create_plugin -from aria.storage import sql_mapi -from aria.orchestrator import events -from aria.orchestrator import plugin from aria.orchestrator.workflows.executor import process @@ -107,9 +112,9 @@ def mock_plugin(plugin_manager, tmpdir): return plugin_manager.install(source=plugin_path) -class MockContext(object): +class MockContext(operation.BaseOperationContext): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called pass def __getattr__(self, item):
