Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-46-execution-plugin b12c12429 -> 76337bb47 (forced update)
ARIA-46 Execution plugin Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/76337bb4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/76337bb4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/76337bb4 Branch: refs/heads/ARIA-46-execution-plugin Commit: 76337bb47475eb160910f0953d957dd7691ab3b1 Parents: 3caf177 Author: Dan Kilman <d...@gigaspaces.com> Authored: Tue Jan 3 17:00:46 2017 +0200 Committer: Dan Kilman <d...@gigaspaces.com> Committed: Sun Jan 8 11:37:49 2017 +0200 ---------------------------------------------------------------------- MANIFEST.in | 1 + aria/orchestrator/execution_plugin/__init__.py | 33 ++ aria/orchestrator/execution_plugin/common.py | 146 ++++++++ aria/orchestrator/execution_plugin/constants.py | 51 +++ .../execution_plugin/ctx_proxy/__init__.py | 16 + .../execution_plugin/ctx_proxy/client.py | 105 ++++++ .../execution_plugin/ctx_proxy/server.py | 240 +++++++++++++ .../execution_plugin/environment_globals.py | 60 ++++ .../orchestrator/execution_plugin/exceptions.py | 36 ++ aria/orchestrator/execution_plugin/local.py | 123 +++++++ .../orchestrator/execution_plugin/operations.py | 59 +++ .../execution_plugin/ssh/__init__.py | 14 + .../execution_plugin/ssh/operations.py | 210 +++++++++++ .../orchestrator/execution_plugin/ssh/tunnel.py | 91 +++++ aria/orchestrator/workflows/api/task.py | 12 +- aria/orchestrator/workflows/core/task.py | 3 +- aria/storage/base_model.py | 26 +- requirements.txt | 3 + setup.py | 9 +- tests/orchestrator/execution_plugin/__init__.py | 14 + .../execution_plugin/test_ctx_proxy_server.py | 355 +++++++++++++++++++ 21 files changed, 1596 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/MANIFEST.in ---------------------------------------------------------------------- diff --git a/MANIFEST.in b/MANIFEST.in index f9bd145..6b08eeb 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1,2 @@ include requirements.txt +include aria/orchestrator/execution_plugin/ctx_proxy/binaries/ctx http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/__init__.py b/aria/orchestrator/execution_plugin/__init__.py new file mode 100644 index 0000000..372022f --- /dev/null +++ b/aria/orchestrator/execution_plugin/__init__.py @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from contextlib import contextmanager + +# Populated during execution of python scripts +ctx = None +inputs = None + + +@contextmanager +def python_script_scope(operation_ctx, operation_inputs): + global ctx + global inputs + try: + ctx = operation_ctx + inputs = operation_inputs + yield + finally: + ctx = None + inputs = None http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/common.py b/aria/orchestrator/execution_plugin/common.py new file mode 100644 index 0000000..fb6e2b0 --- /dev/null +++ b/aria/orchestrator/execution_plugin/common.py @@ -0,0 +1,146 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import tempfile + +import requests + +from . import constants +from . import exceptions + + +def is_windows(): + return os.name == 'nt' + + +def download_script(ctx, script_path): + split = script_path.split('://') + schema = split[0] + if schema in ['http', 'https']: + response = requests.get(script_path) + if response.status_code == 404: + ctx.task.abort('Failed to download script: {0} (status code: {1})' + .format(script_path, response.status_code)) + content = response.text + suffix = script_path.split('/')[-1] + file_descriptor, script_path = tempfile.mkstemp(suffix='-{0}'.format(suffix)) + os.close(file_descriptor) + with open(script_path, 'wb') as f: + f.write(content) + return script_path + else: + return ctx.download_resource(script_path) + + +def create_process_config(script_path, process, operation_kwargs, quote_json_env_vars=False): + """ + update a process with it's environment variables, and return it. + + Get a dict representing a process and a dict representing the environment + variables. Convert each environment variable to a format of + <string representing the name of the variable> : + <json formatted string representing the value of the variable>. + Finally, update the process with the newly formatted environment variables, + and return the process. + + :param process: a dict representing a process + :type process: dict + :param operation_kwargs: a dict representing environment variables that + should exist in the process' running environment. + :type operation_kwargs: dict + :return: the process updated with its environment variables. + :rtype: dict + """ + process = process or {} + env_vars = operation_kwargs.copy() + if 'ctx' in env_vars: + del env_vars['ctx'] + env_vars.update(process.get('env', {})) + for k, v in env_vars.items(): + if isinstance(v, (dict, list, tuple, bool)): + env_var_value = json.dumps(v) + if is_windows(): + # These <k,v> environment variables will subsequently + # be used in a subprocess.Popen() call, as the `env` parameter. + # In some windows python versions, if an environment variable + # name is not of type str (e.g. unicode), the Popen call will + # fail. + k = str(k) + # The windows shell removes all double quotes - escape them + # to still be able to pass JSON in env vars to the shell. + env_var_value = env_var_value.replace('"', '\\"') + if quote_json_env_vars: + env_var_value = "'{0}'".format(env_var_value) + env_vars[k] = env_var_value + process['env'] = env_vars + args = process.get('args') + command = script_path + command_prefix = process.get('command_prefix') + if command_prefix: + command = '{0} {1}'.format(command_prefix, command) + if args: + command = ' '.join([command] + args) + process['command'] = command + return process + + +def patch_ctx(ctx): + ctx._return_value = None + task = ctx.task + task._original_abort = task.abort + task._original_retry = task.retry + + def _validate_legal_action(): + if ctx._return_value is not None: + ctx._return_value = RuntimeError(constants.ILLEGAL_CTX_OPERATION_MESSAGE) + raise ctx._return_value + + def abort_operation(message=None): + _validate_legal_action() + ctx._return_value = exceptions.ScriptException(message=message, retry=False) + return ctx._return_value + task.abort = abort_operation + + def retry_operation(message=None, retry_interval=None): + _validate_legal_action() + ctx._return_value = exceptions.ScriptException(message=message, + retry=True, + retry_interval=retry_interval) + return ctx._return_value + task.retry = retry_operation + + def returns(value): + _validate_legal_action() + ctx._return_value = value + ctx.returns = returns + + +def handle_return_value(ctx, error_check_func=None, reraise=False): + return_value = ctx._return_value + # this happens when more than 1 ctx return action is invoked + if isinstance(return_value, RuntimeError): + ctx.task._original_abort(str(return_value)) + elif isinstance(return_value, exceptions.ScriptException): + if return_value.retry: + ctx.task._original_retry(return_value.message, return_value.retry_interval) + else: + ctx.task._original_abort(return_value.message) + if error_check_func: + error_check_func() + if reraise: + raise + return return_value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/constants.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/constants.py b/aria/orchestrator/execution_plugin/constants.py new file mode 100644 index 0000000..76f3b94 --- /dev/null +++ b/aria/orchestrator/execution_plugin/constants.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import exceptions + +# related to local +PYTHON_SCRIPT_FILE_EXTENSION = '.py' +POWERSHELL_SCRIPT_FILE_EXTENSION = '.ps1' +DEFAULT_POWERSHELL_EXECUTABLE = 'powershell' + +# related to both local and ssh +ILLEGAL_CTX_OPERATION_MESSAGE = 'ctx may only abort or return once' + +# related to ssh +DEFAULT_BASE_DIR = '/tmp/aria-ctx' +FABRIC_ENV_DEFAULTS = { + 'connection_attempts': 5, + 'timeout': 10, + 'forward_agent': False, + 'abort_on_prompts': True, + 'keepalive': 0, + 'linewise': False, + 'pool_size': 0, + 'skip_bad_hosts': False, + 'status': False, + 'disable_known_hosts': True, + 'combine_stderr': True, + 'abort_exception': exceptions.TaskException, +} +VALID_FABRIC_GROUPS = set([ + 'status', + 'aborts', + 'warnings', + 'running', + 'stdout', + 'stderr', + 'user', + 'everything' +]) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py b/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py new file mode 100644 index 0000000..7571c15 --- /dev/null +++ b/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import server, client http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/ctx_proxy/client.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/client.py b/aria/orchestrator/execution_plugin/ctx_proxy/client.py new file mode 100644 index 0000000..d965a5e --- /dev/null +++ b/aria/orchestrator/execution_plugin/ctx_proxy/client.py @@ -0,0 +1,105 @@ +#! /usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import json +import os +import sys +import urllib2 + + +# Environment variable for the socket url (used by clients to locate the socket) +CTX_SOCKET_URL = 'CTX_SOCKET_URL' + + +class _RequestError(RuntimeError): + + def __init__(self, ex_message, ex_type, ex_traceback): + super(_RequestError, self).__init__(self, '{0}: {1}'.format(ex_type, ex_message)) + self.ex_type = ex_type + self.ex_message = ex_message + self.ex_traceback = ex_traceback + + +def _http_request(socket_url, request, timeout): + response = urllib2.urlopen( + url=socket_url, + data=json.dumps(request), + timeout=timeout) + if response.code != 200: + raise RuntimeError('Request failed: {0}'.format(response)) + return json.loads(response.read()) + + +def _client_request(socket_url, args, timeout): + response = _http_request( + socket_url=socket_url, + request={'args': args}, + timeout=timeout) + payload = response['payload'] + response_type = response.get('type') + if response_type == 'error': + ex_type = payload['type'] + ex_message = payload['message'] + ex_traceback = payload['traceback'] + raise _RequestError(ex_message, ex_type, ex_traceback) + elif response_type == 'stop_operation': + raise SystemExit(payload['message']) + else: + return payload + + +def _parse_args(args): + parser = argparse.ArgumentParser() + parser.add_argument('-t', '--timeout', type=int, default=30) + parser.add_argument('--socket-url', default=os.environ.get(CTX_SOCKET_URL)) + parser.add_argument('--json-arg-prefix', default='@') + parser.add_argument('-j', '--json-output', action='store_true') + parser.add_argument('args', nargs='*') + args = parser.parse_args(args=args) + if not args.socket_url: + raise RuntimeError('Missing CTX_SOCKET_URL environment variable ' + 'or socket_url command line argument. (ctx is supposed to be executed ' + 'within an operation context)') + return args + + +def _process_args(json_prefix, args): + processed_args = [] + for arg in args: + if arg.startswith(json_prefix): + arg = json.loads(arg[1:]) + processed_args.append(arg) + return processed_args + + +def main(args=None): + args = _parse_args(args) + response = _client_request( + socket_url=args.socket_url, + args=_process_args(args.json_arg_prefix, args.args), + timeout=args.timeout) + if args.json_output: + response = json.dumps(response) + else: + if not response: + response = '' + response = str(response) + sys.stdout.write(response) + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py new file mode 100644 index 0000000..4bb6d23 --- /dev/null +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -0,0 +1,240 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import json +import re +import socket +import threading +import traceback +import Queue +import StringIO +import wsgiref.simple_server + +import bottle + +from .. import exceptions + + +class CtxProxy(object): + + def __init__(self, ctx): + self.ctx = ctx + self.port = _get_unused_port() + self.socket_url = 'http://localhost:{0}'.format(self.port) + self.server = None + self._started = Queue.Queue(1) + self.thread = self._start_server() + self._started.get(timeout=5) + + def _start_server(self): + proxy = self + + class BottleServerAdapter(bottle.ServerAdapter): + def run(self, app): + class Server(wsgiref.simple_server.WSGIServer): + allow_reuse_address = True + + def handle_error(self, request, client_address): + pass + + class Handler(wsgiref.simple_server.WSGIRequestHandler): + def address_string(self): + return self.client_address[0] + + def log_request(*args, **kwargs): # pylint: disable=no-method-argument + if not self.quiet: + return wsgiref.simple_server.WSGIRequestHandler.log_request(*args, + **kwargs) + server = wsgiref.simple_server.make_server( + host=self.host, + port=self.port, + app=app, + server_class=Server, + handler_class=Handler) + proxy.server = server + self.port = server.server_port + proxy._started.put(True) + server.serve_forever(poll_interval=0.1) + + def serve(): + bottle_app = bottle.Bottle() + bottle_app.post('/', callback=self._request_handler) + bottle.run( + app=bottle_app, + host='localhost', + port=self.port, + quiet=True, + server=BottleServerAdapter) + thread = threading.Thread(target=serve) + thread.daemon = True + thread.start() + return thread + + def close(self): + if self.server: + self.server.shutdown() + self.server.server_close() + + def _request_handler(self): + request = bottle.request.body.read() # pylint: disable=no-member + response = self._process(request) + return bottle.LocalResponse( + body=response, + status=200, + headers={'content-type': 'application/json'}) + + def _process(self, request): + try: + typed_request = json.loads(request) + args = typed_request['args'] + payload = _process_ctx_request(self.ctx, args) + result_type = 'result' + if isinstance(payload, exceptions.ScriptException): + payload = dict(message=str(payload)) + result_type = 'stop_operation' + result = json.dumps({ + 'type': result_type, + 'payload': payload + }) + except Exception as e: + traceback_out = StringIO.StringIO() + traceback.print_exc(file=traceback_out) + payload = { + 'type': type(e).__name__, + 'message': str(e), + 'traceback': traceback_out.getvalue() + } + result = json.dumps({ + 'type': 'error', + 'payload': payload + }) + return result + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + + +def _process_ctx_request(ctx, args): + current = ctx + num_args = len(args) + index = 0 + while index < num_args: + arg = args[index] + attr = _desugar_attr(current, arg) + if attr: + current = getattr(current, attr) + elif isinstance(current, collections.MutableMapping): + key = arg + path_dict = _PathDictAccess(current) + if index + 1 == num_args: + # read dict prop by path + value = path_dict.get(key) + current = value + elif index + 2 == num_args: + # set dict prop by path + value = args[index + 1] + current = path_dict.set(key, value) + else: + raise RuntimeError('Illegal argument while accessing dict') + break + elif callable(current): + kwargs = {} + remaining_args = args[index:] + if isinstance(remaining_args[-1], collections.MutableMapping): + kwargs = remaining_args[-1] + remaining_args = remaining_args[:-1] + current = current(*remaining_args, **kwargs) + break + else: + raise RuntimeError('{0} cannot be processed in {1}'.format(arg, args)) + index += 1 + if callable(current): + current = current() + return current + + +def _desugar_attr(obj, attr): + if not isinstance(attr, basestring): + return None + if hasattr(obj, attr): + return attr + attr = attr.replace('-', '_') + if hasattr(obj, attr): + return attr + return None + + +class _PathDictAccess(object): + pattern = re.compile(r"(.+)\[(\d+)\]") + + def __init__(self, obj): + self.obj = obj + + def set(self, prop_path, value): + obj, prop_name = self._get_parent_obj_prop_name_by_path(prop_path) + obj[prop_name] = value + return value + + def get(self, prop_path): + value = self._get_object_by_path(prop_path) + return value + + def _get_object_by_path(self, prop_path, fail_on_missing=True): + # when setting a nested object, make sure to also set all the + # intermediate path objects + current = self.obj + for prop_segment in prop_path.split('.'): + match = self.pattern.match(prop_segment) + if match: + index = int(match.group(2)) + property_name = match.group(1) + if property_name not in current: + self._raise_illegal(prop_path) + if not isinstance(current[property_name], list): + self._raise_illegal(prop_path) + current = current[property_name][index] + else: + if prop_segment not in current: + if fail_on_missing: + self._raise_illegal(prop_path) + else: + current[prop_segment] = {} + current = current[prop_segment] + return current + + def _get_parent_obj_prop_name_by_path(self, prop_path): + split = prop_path.split('.') + if len(split) == 1: + return self.obj, prop_path + parent_path = '.'.join(split[:-1]) + parent_obj = self._get_object_by_path(parent_path, fail_on_missing=False) + prop_name = split[-1] + return parent_obj, prop_name + + @staticmethod + def _raise_illegal(prop_path): + raise RuntimeError('illegal path: {0}'.format(prop_path)) + + +def _get_unused_port(): + sock = socket.socket() + sock.bind(('127.0.0.1', 0)) + _, port = sock.getsockname() + sock.close() + return port http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/environment_globals.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/environment_globals.py b/aria/orchestrator/execution_plugin/environment_globals.py new file mode 100644 index 0000000..27311f0 --- /dev/null +++ b/aria/orchestrator/execution_plugin/environment_globals.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_initial_globals(path): + """ emulates a `globals()` call in a freshly loaded module + + The implementation of this function is likely to raise a couple of + questions. If you read the implementation and nothing bothered you, feel + free to skip the rest of this docstring. + + First, why is this function in its own module and not, say, in the same + module of the other environment-related functions? + Second, why is it implemented in such a way that copies the globals, then + deletes the item that represents this function, and then changes some + other entries? + + Well, these two questions can be answered with one (elaborate) explanation. + If this function was in the same module with the other environment-related + functions, then we would have had to delete more items in globals than just + `create_initial_globals`. That is because all of the other function names + would also be in globals, and since there is no built-in mechanism that + return the name of the user-defined objects, this approach is quite an + overkill. + + - But why do we rely on the copy-existing-globals-and-delete-entries + method, when it seems to force us to put `create_initial_globals` in its + own file? + + Well, because there is no easier method of creating globals of a newly + loaded module. + + - How about hard coding a 'global' dict? It seems that there are very few + entries: __doc__, __file__, __name__, __package__ (but don't forget + __builtins__). + + That would be coupling our implementation to a specific `globals` + implementation. What if `globals` were to change? + """ + copied_globals = globals().copy() + copied_globals.update({ + '__doc__': 'Dynamically executed script', + '__file__': path, + '__name__': '__main__', + '__package__': None + }) + del copied_globals[create_initial_globals.__name__] + return copied_globals http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/exceptions.py b/aria/orchestrator/execution_plugin/exceptions.py new file mode 100644 index 0000000..37225b1 --- /dev/null +++ b/aria/orchestrator/execution_plugin/exceptions.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class ProcessException(Exception): + + def __init__(self, command, exit_code, stdout, stderr): + super(ProcessException, self).__init__(stderr) + self.command = command + self.exit_code = exit_code + self.stdout = stdout + self.stderr = stderr + + +class TaskException(Exception): + pass + + +class ScriptException(Exception): + + def __init__(self, message, retry, retry_interval=None): + super(ScriptException, self).__init__(message) + self.retry = retry + self.retry_interval = retry_interval http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/local.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/local.py b/aria/orchestrator/execution_plugin/local.py new file mode 100644 index 0000000..cadd594 --- /dev/null +++ b/aria/orchestrator/execution_plugin/local.py @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import subprocess +import threading +import StringIO + +from . import ctx_proxy +from . import exceptions +from . import common +from . import constants +from . import environment_globals +from . import python_script_scope + + +def run_script(ctx, script_path, process, **kwargs): + process = process or {} + script_path = common.download_script(ctx, script_path) + script_func = _get_run_script_func(script_path, process) + return script_func( + ctx=ctx, + script_path=script_path, + process=process, + operation_kwargs=kwargs) + + +def _get_run_script_func(script_path, process): + if _treat_script_as_python_script(script_path, process): + return _eval_script_func + else: + if _treat_script_as_powershell_script(script_path): + process.setdefault('command_prefix', constants.DEFAULT_POWERSHELL_EXECUTABLE) + return _execute_func + + +def _treat_script_as_python_script(script_path, process): + eval_python = process.get('eval_python') + script_extension = os.path.splitext(script_path)[1].lower() + return (eval_python is True or (script_extension == constants.PYTHON_SCRIPT_FILE_EXTENSION and + eval_python is not False)) + + +def _treat_script_as_powershell_script(script_path): + script_extension = os.path.splitext(script_path)[1].lower() + return script_extension == constants.POWERSHELL_SCRIPT_FILE_EXTENSION + + +def _eval_script_func(script_path, ctx, _, operation_kwargs): + with python_script_scope(operation_ctx=ctx, operation_inputs=operation_kwargs): + execfile(script_path, environment_globals.create_initial_globals(script_path)) + + +def _execute_func(script_path, ctx, process, operation_kwargs): + os.chmod(script_path, 0755) + process = common.create_process_config( + script_path=script_path, + process=process, + operation_kwargs=operation_kwargs) + command = process['command'] + env = os.environ.copy() + env.update(process['env']) + ctx.logger.info('Executing: {0}'.format(command)) + common.patch_ctx(ctx) + with ctx_proxy.server.CtxProxy(ctx) as proxy: + env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url + running_process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + cwd=process.get('cwd'), + bufsize=1, + close_fds=not common.is_windows()) + stdout_consumer = _OutputConsumer(running_process.stdout) + stderr_consumer = _OutputConsumer(running_process.stderr) + exit_code = running_process.wait() + stdout_consumer.join() + stderr_consumer.join() + ctx.logger.info('Execution done (exit_code={0}): {1}'.format(exit_code, command)) + + def error_check_func(): + if exit_code: + raise exceptions.ProcessException( + command=command, + exit_code=exit_code, + stdout=stdout_consumer.read_output(), + stderr=stderr_consumer.read_output()) + return common.handle_return_value(ctx, error_check_func=error_check_func) + + +class _OutputConsumer(object): + + def __init__(self, out): + self._out = out + self._buffer = StringIO.StringIO() + self._consumer = threading.Thread(target=self._consume_output) + self._consumer.daemon = True + self._consumer.start() + + def _consume_output(self): + for line in iter(self._out.readline, b''): + self._buffer.write(line) + self._out.close() + + def read_output(self): + return self._buffer.getvalue() + + def join(self): + self._consumer.join() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/operations.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/operations.py b/aria/orchestrator/execution_plugin/operations.py new file mode 100644 index 0000000..3ad3c0e --- /dev/null +++ b/aria/orchestrator/execution_plugin/operations.py @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import local as local_operations +from .ssh import operations as ssh_operations + + +def run_script_locally(ctx, + script_path, + process=None, + **kwargs): + return local_operations.run_script( + ctx=ctx, + script_path=script_path, + process=process, + **kwargs) + + +def run_script_with_ssh(ctx, + script_path, + fabric_env=None, + process=None, + use_sudo=False, + hide_output=None, + **kwargs): + return ssh_operations.run_script( + ctx=ctx, + script_path=script_path, + fabric_env=fabric_env, + process=process, + use_use=use_sudo, + hide_output=hide_output, + **kwargs) + + +def run_commands_with_ssh(ctx, + commands, + fabric_env=None, + use_sudo=False, + hide_output=None, + **_): + return ssh_operations.run_commands( + ctx=ctx, + commands=commands, + fabric_env=fabric_env, + use_sudo=use_sudo, + hide_output=hide_output) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/ssh/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ssh/__init__.py b/aria/orchestrator/execution_plugin/ssh/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/aria/orchestrator/execution_plugin/ssh/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/ssh/operations.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ssh/operations.py b/aria/orchestrator/execution_plugin/ssh/operations.py new file mode 100644 index 0000000..175ed82 --- /dev/null +++ b/aria/orchestrator/execution_plugin/ssh/operations.py @@ -0,0 +1,210 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import random +import string +import StringIO + +import fabric.api +import fabric.context_managers +import fabric.contrib.files + +from .. import constants +from .. import exceptions +from .. import common +from .. import ctx_proxy +from . import tunnel + + +_PROXY_CLIENT_PATH = ctx_proxy.client.__file__ +if _PROXY_CLIENT_PATH.endswith('.pyc'): + _PROXY_CLIENT_PATH = _PROXY_CLIENT_PATH[:-1] + + +def run_commands(ctx, commands, fabric_env, use_sudo, hide_output, **_): + """Runs the provider 'commands' in sequence + + :param commands: a list of commands to run + :param fabric_env: fabric configuration + """ + with fabric.api.settings(_hide_output(ctx, groups=hide_output), + **_fabric_env(ctx, fabric_env, warn_only=True)): + for command in commands: + ctx.logger.info('Running command: {0}'.format(command)) + run = fabric.api.sudo if use_sudo else fabric.api.run + result = run(command) + if result.failed: + raise exceptions.ProcessException( + command=result.command, + exit_code=result.return_code, + stdout=result.stdout, + stderr=result.stderr) + + +def run_script(ctx, script_path, fabric_env, process, use_sudo, hide_output, **kwargs): + process = process or {} + paths = _Paths(base_dir=process.get('base_dir', constants.DEFAULT_BASE_DIR), + local_script_path=common.download_script(ctx, script_path)) + with fabric.api.settings(_hide_output(ctx, groups=hide_output), + **_fabric_env(ctx, fabric_env, warn_only=False)): + # the remote host must have the ctx before running any fabric scripts + if not fabric.contrib.files.exists(paths.remote_ctx_path): + # there may be race conditions with other operations that + # may be running in parallel, so we pass -p to make sure + # we get 0 exit code if the directory already exists + fabric.api.run('mkdir -p {0} && mkdir -p {1}'.format(paths.remote_scripts_dir, + paths.remote_work_dir)) + # this file has to be present before using ctx + fabric.api.put(_PROXY_CLIENT_PATH, paths.remote_ctx_path) + _patch_ctx(ctx, paths.remote_work_dir) + process = common.create_process_config( + script_path=paths.remote_script_path, + process=process, + operation_kwargs=kwargs, + quote_json_env_vars=True) + fabric.api.put(paths.local_script_path, paths.remote_script_path) + with ctx_proxy.server.CtxProxy(ctx) as proxy: + local_port = proxy.port + with fabric.context_managers.cd(process.get('cwd', paths.remote_work_dir)): # pylint: disable=not-context-manager + with tunnel.remote(ctx, local_port=local_port) as remote_port: + local_socket_url = proxy.socket_url + remote_socket_url = local_socket_url.replace(str(local_port), str(remote_port)) + env_script = _write_environment_script_file( + process=process, + paths=paths, + local_socket_url=local_socket_url, + remote_socket_url=remote_socket_url) + fabric.api.put(env_script, paths.remote_env_script_path) + try: + command = 'source {0} && {1}'.format(paths.remote_env_script_path, + process['command']) + run = fabric.api.sudo if use_sudo else fabric.api.run + run(command) + except exceptions.TaskException: + return common.handle_return_value(ctx, reraise=True) + return common.handle_return_value(ctx) + + +def _patch_ctx(ctx, remote_work_dir): + common.patch_ctx(ctx) + original_download_resource = ctx.download_resource + original_download_resource_and_render = ctx.download_resource_and_render + + def fabric_put_in_remote_path(local_target_path, target_path): + if target_path: + remote_target_path = target_path + else: + remote_target_path = '{0}/{1}'.format(remote_work_dir, + os.path.basename(local_target_path)) + fabric.api.put(local_target_path, remote_target_path) + return remote_target_path + + def download_resource(resource_path, target_path=None): + local_target_path = original_download_resource(resource_path) + return fabric_put_in_remote_path(local_target_path, target_path) + ctx.download_resource = download_resource + + def download_resource_and_render(resource_path, + target_path=None, + template_variables=None): + local_target_path = original_download_resource_and_render( + resource_path, + template_variables=template_variables) + return fabric_put_in_remote_path(local_target_path, target_path) + ctx.download_resource_and_render = download_resource_and_render + + +def _hide_output(ctx, groups): + """ Hides Fabric's output for every 'entity' in `groups` """ + groups = set(groups or []) + if not groups.issubset(constants.VALID_FABRIC_GROUPS): + ctx.task.abort('`hide_output` must be a subset of {0} (Provided: {1})' + .format(', '.join(constants.VALID_FABRIC_GROUPS), ', '.join(groups))) + return fabric.api.hide(*groups) + + +def _fabric_env(ctx, fabric_env, warn_only): + """Prepares fabric environment variables configuration""" + ctx.logger.debug('Preparing fabric environment...') + env = constants.FABRIC_ENV_DEFAULTS.copy() + env.update(fabric_env or {}) + env.setdefault('warn_only', warn_only) + if 'host_string' not in env: + env['host_string'] = ctx.task.runs_on.ip + # validations + if not env.get('host_string'): + ctx.task.abort('`host_string` not supplied and ip cannot be deduced automatically') + if not (env.get('password') or env.get('key_filename') or env.get('key')): + ctx.task.abort( + 'Access credentials not supplied ' + '(you must supply at least one of `key_filename`, `key` or `password`)') + if not env.get('user'): + ctx.task.abort('`user` not supplied') + ctx.logger.debug('Environment prepared successfully') + return env + + +def _write_environment_script_file(process, paths, local_socket_url, remote_socket_url): + env_script = StringIO.StringIO() + env = process['env'] + env['PATH'] = '{0}:$PATH'.format(paths.remote_ctx_dir) + env['PYTHONPATH'] = '{0}:$PYTHONPATH'.format(paths.remote_ctx_dir) + env_script.write('chmod +x {0}\n'.format(paths.remote_script_path)) + env_script.write('chmod +x {0}\n'.format(paths.remote_ctx_path)) + env.update({ + ctx_proxy.client.CTX_SOCKET_URL: remote_socket_url, + 'LOCAL_{0}'.format(ctx_proxy.client.CTX_SOCKET_URL): local_socket_url + }) + for key, value in env.iteritems(): + env_script.write('export {0}={1}\n'.format(key, value)) + return env_script + + +class _Paths(object): + + def __init__(self, base_dir, local_script_path): + self.local_script_path = local_script_path + self.remote_ctx_dir = base_dir + self.random_suffix = ''.join( + random.choice(string.ascii_lowercase + string.digits) for _ in range(8)) + + @property + def base_script_path(self): + return os.path.basename(self.local_script_path) + + @property + def remote_ctx_path(self): + return '{0}/ctx'.format(self.remote_ctx_dir) + + @property + def remote_scripts_dir(self): + return '{0}/scripts'.format(self.remote_ctx_dir) + + @property + def remote_work_dir(self): + return '{0}/work'.format(self.remote_ctx_dir) + + @property + def _remote_path_suffix(self): + return '{0}-{1}'.format(self.base_script_path, self.random_suffix) + + @property + def remote_env_script_path(self): + return '{0}/env-{1}'.format(self.remote_scripts_dir, self._remote_path_suffix) + + @property + def remote_script_path(self): + return '{0}/{1}'.format(self.remote_scripts_dir, self._remote_path_suffix) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/execution_plugin/ssh/tunnel.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ssh/tunnel.py b/aria/orchestrator/execution_plugin/ssh/tunnel.py new file mode 100644 index 0000000..6fc8d54 --- /dev/null +++ b/aria/orchestrator/execution_plugin/ssh/tunnel.py @@ -0,0 +1,91 @@ +# This implementation was copied from the Fabric project directly: +# https://github.com/fabric/fabric/blob/master/fabric/context_managers.py#L486 +# The purpose was to remove the rtunnel creation printouts here: +# https://github.com/fabric/fabric/blob/master/fabric/context_managers.py#L547 + + +import contextlib +import select +import socket + +import fabric.api +import fabric.state +import fabric.thread_handling + + +@contextlib.contextmanager +def remote(ctx, local_port, remote_port=0, local_host='localhost', remote_bind_address='127.0.0.1'): + """Create a tunnel forwarding a locally-visible port to the remote target.""" + sockets = [] + channels = [] + thread_handlers = [] + + def accept(channel, *args, **kwargs): + # This seemingly innocent statement seems to be doing nothing + # but the truth is far from it! + # calling fileno() on a paramiko channel the first time, creates + # the required plumbing to make the channel valid for select. + # While this would generally happen implicitly inside the _forwarder + # function when select is called, it may already be too late and may + # cause the select loop to hang. + # Specifically, when new data arrives to the channel, a flag is set + # on an "event" object which is what makes the select call work. + # problem is this will only happen if the event object is not None + # and it will be not-None only after channel.fileno() has been called + # for the first time. If we wait until _forwarder calls select for the + # first time it may be after initial data has reached the channel. + # calling it explicitly here in the paramiko transport main event loop + # guarantees this will not happen. + channel.fileno() + + channels.append(channel) + sock = socket.socket() + sockets.append(sock) + + try: + sock.connect((local_host, local_port)) + except Exception as e: + try: + channel.close() + except Exception as ex2: + close_error = ' (While trying to close channel: {0})'.format(ex2) + else: + close_error = '' + ctx.task.abort('[{0}] rtunnel: cannot connect to {1}:{2} ({3}){4}' + .format(fabric.api.env.host_string, local_host, local_port, e, + close_error)) + + thread_handler = fabric.thread_handling.ThreadHandler('fwd', _forwarder, channel, sock) + thread_handlers.append(thread_handler) + + transport = fabric.state.connections[fabric.api.env.host_string].get_transport() + remote_port = transport.request_port_forward( + remote_bind_address, remote_port, handler=accept) + + try: + yield remote_port + finally: + for sock, chan, thread_handler in zip(sockets, channels, thread_handlers): + sock.close() + chan.close() + thread_handler.thread.join() + thread_handler.raise_if_needed() + transport.cancel_port_forward(remote_bind_address, remote_port) + + +def _forwarder(chan, sock): + # Bidirectionally forward data between a socket and a Paramiko channel. + while True: + read = select.select([sock, chan], [], [])[0] + if sock in read: + data = sock.recv(1024) + if len(data) == 0: + break + chan.send(data) + if chan in read: + data = chan.recv(1024) + if len(data) == 0: + break + sock.send(data) + chan.close() + sock.close() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 70324a6..8d93837 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -68,7 +68,8 @@ class OperationTask(BaseTask): retry_interval=None, ignore_failure=None, inputs=None, - plugin=None): + plugin=None, + runs_on=None): """ Creates an operation task using the name, details, node instance and any additional kwargs. :param name: the operation of the name. @@ -89,6 +90,7 @@ class OperationTask(BaseTask): if retry_interval is None else retry_interval) self.ignore_failure = (self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure) + self.runs_on = runs_on @classmethod def node_instance(cls, instance, name, inputs=None, *args, **kwargs): @@ -104,6 +106,7 @@ class OperationTask(BaseTask): operation_details=instance.node.operations[name], inputs=inputs, plugins=instance.node.plugins or [], + runs_on=model.Task.RUNS_ON_NODE, *args, **kwargs) @@ -126,18 +129,22 @@ class OperationTask(BaseTask): operation_details = getattr(instance.relationship, operation_end)[name] if operation_end == cls.SOURCE_OPERATION: plugins = instance.relationship.source_node.plugins + runs_on = model.Task.RUNS_ON_SOURCE else: plugins = instance.relationship.target_node.plugins + runs_on = model.Task.RUNS_ON_TARGET return cls._instance(instance=instance, name=name, operation_details=operation_details, inputs=inputs, plugins=plugins or [], + runs_on=runs_on, *args, **kwargs) @classmethod - def _instance(cls, instance, name, operation_details, inputs, plugins, *args, **kwargs): + def _instance(cls, instance, name, operation_details, inputs, plugins, runs_on, *args, + **kwargs): operation_mapping = operation_details.get('operation') operation_inputs = operation_details.get('inputs', {}) operation_inputs.update(inputs or {}) @@ -151,6 +158,7 @@ class OperationTask(BaseTask): operation_mapping=operation_mapping, inputs=operation_inputs, plugin=plugin, + runs_on=runs_on, *args, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 67be2ea..1deb66a 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -135,7 +135,8 @@ class OperationTask(BaseTask): retry_interval=api_task.retry_interval, ignore_failure=api_task.ignore_failure, plugin=plugins[0] if plugins else None, - execution=self._workflow_context.execution + execution=self._workflow_context.execution, + runs_on=api_task.runs_on ) self._workflow_context.model.task.put(operation_task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/aria/storage/base_model.py ---------------------------------------------------------------------- diff --git a/aria/storage/base_model.py b/aria/storage/base_model.py index 418d3b6..6529574 100644 --- a/aria/storage/base_model.py +++ b/aria/storage/base_model.py @@ -648,6 +648,11 @@ class TaskBase(ModelMixin): WAIT_STATES = [PENDING, RETRYING] END_STATES = [SUCCESS, FAILED] + RUNS_ON_SOURCE = 'source' + RUNS_ON_TARGET = 'target' + RUNS_ON_NODE = 'node' + RUNS_ON = (RUNS_ON_NODE, RUNS_ON_SOURCE, RUNS_ON_TARGET) + @orm.validates('max_attempts') def validate_max_attempts(self, _, value): # pylint: disable=no-self-use """Validates that max attempts is either -1 or a positive number""" @@ -658,7 +663,7 @@ class TaskBase(ModelMixin): INFINITE_RETRIES = -1 - status = Column(Enum(*STATES), name='status', default=PENDING) + status = Column(Enum(*STATES, name='status'), default=PENDING) due_at = Column(DateTime, default=datetime.utcnow) started_at = Column(DateTime, default=None) @@ -671,6 +676,7 @@ class TaskBase(ModelMixin): # Operation specific fields operation_mapping = Column(String) inputs = Column(Dict) + _runs_on = Column(Enum(*RUNS_ON, name='runs_on'), name='runs_on') @property def actor(self): @@ -680,13 +686,23 @@ class TaskBase(ModelMixin): """ return self.node_instance or self.relationship_instance + @property + def runs_on(self): + if self._runs_on == self.RUNS_ON_NODE: + return self.node_instance + elif self._runs_on == self.RUNS_ON_SOURCE: + return self.relationship_instance.source_node_instance # pylint: disable=no-member + elif self._runs_on == self.RUNS_ON_TARGET: + return self.relationship_instance.target_node_instance # pylint: disable=no-member + return None + @classmethod - def as_node_instance(cls, instance, **kwargs): - return cls(node_instance=instance, **kwargs) + def as_node_instance(cls, instance, runs_on, **kwargs): + return cls(node_instance=instance, _runs_on=runs_on, **kwargs) @classmethod - def as_relationship_instance(cls, instance, **kwargs): - return cls(relationship_instance=instance, **kwargs) + def as_relationship_instance(cls, instance, runs_on, **kwargs): + return cls(relationship_instance=instance, _runs_on=runs_on, **kwargs) @staticmethod def abort(message=None): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/requirements.txt ---------------------------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index 0005a5e..055bafb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,6 @@ CacheControl[filecache]==0.11.6 clint==0.5.1 SQLAlchemy==1.1.4 wagon==0.5.0 +bottle==0.12.11 +six==1.10.0 +Fabric==1.13.1 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 112f13e..5a3d016 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,11 @@ except IOError: install_requires = [] +console_scripts = ['aria = aria.cli.cli:main'] +if os.environ.get('INSTALL_CTX'): + console_scripts.append('ctx = aria.orchestrator.execution_plugin.ctx_proxy.client:main') + + setup( name=_PACKAGE_NAME, version=version, @@ -77,8 +82,6 @@ setup( zip_safe=False, install_requires=install_requires, entry_points={ - 'console_scripts': [ - 'aria = aria.cli.cli:main' - ] + 'console_scripts': console_scripts } ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/tests/orchestrator/execution_plugin/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/__init__.py b/tests/orchestrator/execution_plugin/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/tests/orchestrator/execution_plugin/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/76337bb4/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py new file mode 100644 index 0000000..f133e15 --- /dev/null +++ b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py @@ -0,0 +1,355 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +import sys +import subprocess +import StringIO + +import pytest + +from aria.orchestrator.execution_plugin import ctx_proxy + + +class TestCtxProxy(object): + + def test_attribute_access(self, server): + response = self.request(server, 'stub_attr', 'some_property') + assert response == 'some_value' + + def test_sugared_attribute_access(self, server): + response = self.request(server, 'stub-attr', 'some-property') + assert response == 'some_value' + + def test_dict_prop_access_get_key(self, server): + response = self.request(server, 'node', 'properties', 'prop1') + assert response == 'value1' + + def test_dict_prop_access_get_key_nested(self, server): + response = self.request(server, 'node', 'properties', 'prop2.nested_prop1') + assert response == 'nested_value1' + + def test_dict_prop_access_get_with_list_index(self, server): + response = self.request(server, 'node', 'properties', 'prop3[2].value') + assert response == 'value_2' + + def test_dict_prop_access_set(self, server, ctx): + self.request(server, 'node', 'properties', 'prop4.key', 'new_value') + self.request(server, 'node', 'properties', 'prop3[2].value', 'new_value_2') + self.request(server, 'node', 'properties', 'prop4.some.new.path', + 'some_new_value') + assert ctx.node.properties['prop4']['key'] == 'new_value' + assert ctx.node.properties['prop3'][2]['value'] == 'new_value_2' + assert ctx.node.properties['prop4']['some']['new']['path'] == 'some_new_value' + + def test_illegal_dict_access(self, server): + self.request(server, 'node', 'properties', 'prop4.key', 'new_value') + with pytest.raises(RuntimeError): + self.request(server, 'node', 'properties', 'prop4.key', 'new_value', 'what') + + def test_method_invocation(self, server): + args = ['arg1', 'arg2', 'arg3'] + response_args = self.request(server, 'stub-method', *args) + assert response_args == args + + def test_method_invocation_no_args(self, server): + response = self.request(server, 'stub-method') + assert response == [] + + def test_method_invocation_kwargs(self, server): + arg1 = 'arg1' + arg2 = 'arg2' + arg4 = 'arg4_override' + arg5 = 'arg5' + kwargs = dict( + arg4=arg4, + arg5=arg5) + response = self.request(server, 'stub_args', arg1, arg2, kwargs) + assert response == dict( + arg1=arg1, + arg2=arg2, + arg3='arg3', + arg4=arg4, + args=[], + kwargs=dict( + arg5=arg5)) + + def test_empty_return_value(self, server): + response = self.request(server, 'stub_none') + assert response is None + + def test_client_request_timeout(self, server): + with pytest.raises(IOError): + ctx_proxy.client._client_request(server.socket_url, + args=['stub-sleep', '0.5'], + timeout=0.1) + + def test_processing_exception(self, server): + with pytest.raises(ctx_proxy.client._RequestError): + self.request(server, 'property_that_does_not_exist') + + def test_not_json_serializable(self, server): + with pytest.raises(ctx_proxy.client._RequestError): + self.request(server, 'logger') + + def test_no_string_arg(self, server): + args = ['stub_method', 1, 2] + response = self.request(server, *args) + assert response == args[1:] + + class StubAttribute(object): + some_property = 'some_value' + + class NodeAttribute(object): + def __init__(self, properties): + self.properties = properties + + @staticmethod + def stub_method(*args): + return args + + @staticmethod + def stub_sleep(seconds): + time.sleep(float(seconds)) + + @staticmethod + def stub_args(arg1, arg2, arg3='arg3', arg4='arg4', *args, **kwargs): + return dict( + arg1=arg1, + arg2=arg2, + arg3=arg3, + arg4=arg4, + args=args, + kwargs=kwargs) + + @pytest.fixture + def ctx(self): + class MockCtx(object): + pass + ctx = MockCtx() + properties = { + 'prop1': 'value1', + 'prop2': { + 'nested_prop1': 'nested_value1' + }, + 'prop3': [ + {'index': 0, 'value': 'value_0'}, + {'index': 1, 'value': 'value_1'}, + {'index': 2, 'value': 'value_2'} + ], + 'prop4': { + 'key': 'value' + } + } + ctx.stub_none = None + ctx.stub_method = self.stub_method + ctx.stub_sleep = self.stub_sleep + ctx.stub_args = self.stub_args + ctx.stub_attr = self.StubAttribute() + ctx.node = self.NodeAttribute(properties) + return ctx + + @pytest.fixture + def server(self, ctx): + result = ctx_proxy.server.CtxProxy(ctx) + yield result + result.close() + + def request(self, server, *args): + return ctx_proxy.client._client_request(server.socket_url, args, timeout=5) + + +class TestArgumentParsing(object): + + def test_socket_url_arg(self): + self.expected.update(dict(socket_url='sock_url')) + ctx_proxy.client.main(['--socket-url', self.expected.get('socket_url')]) + + def test_socket_url_env(self): + expected_socket_url = 'env_sock_url' + os.environ['CTX_SOCKET_URL'] = expected_socket_url + self.expected.update(dict(socket_url=expected_socket_url)) + ctx_proxy.client.main([]) + + def test_socket_url_missing(self): + del os.environ['CTX_SOCKET_URL'] + with pytest.raises(RuntimeError): + ctx_proxy.client.main([]) + + def test_args(self): + self.expected.update(dict(args=['1', '2', '3'])) + ctx_proxy.client.main(self.expected.get('args')) + + def test_timeout(self): + self.expected.update(dict(timeout='10')) + ctx_proxy.client.main(['--timeout', self.expected.get('timeout')]) + self.expected.update(dict(timeout='15')) + ctx_proxy.client.main(['-t', self.expected.get('timeout')]) + + def test_mixed_order(self): + self.expected.update(dict( + args=['1', '2', '3'], timeout='20', socket_url='mixed_socket_url')) + ctx_proxy.client.main( + ['-t', self.expected.get('timeout')] + + ['--socket-url', self.expected.get('socket_url')] + + self.expected.get('args')) + ctx_proxy.client.main( + ['-t', self.expected.get('timeout')] + + self.expected.get('args') + + ['--socket-url', self.expected.get('socket_url')]) + ctx_proxy.client.main( + self.expected.get('args') + + ['-t', self.expected.get('timeout')] + + ['--socket-url', self.expected.get('socket_url')]) + + def test_json_args(self): + args = ['@1', '@[1,2,3]', '@{"key":"value"}'] + expected_args = [1, [1, 2, 3], {'key': 'value'}] + self.expected.update(dict(args=expected_args)) + ctx_proxy.client.main(args) + + def test_json_arg_prefix(self): + args = ['_1', '@1'] + expected_args = [1, '@1'] + self.expected.update(dict(args=expected_args)) + ctx_proxy.client.main(args + ['--json-arg-prefix', '_']) + + def test_json_output(self): + self.assert_valid_output('string', 'string', '"string"') + self.assert_valid_output(1, '1', '1') + self.assert_valid_output([1, '2'], "[1, '2']", '[1, "2"]') + self.assert_valid_output({'key': 1}, + "{'key': 1}", + '{"key": 1}') + self.assert_valid_output(False, '', 'false') + self.assert_valid_output(True, 'True', 'true') + self.assert_valid_output([], '', '[]') + self.assert_valid_output({}, '', '{}') + + def assert_valid_output(self, response, ex_typed_output, ex_json_output): + self.mock_response = response + current_stdout = sys.stdout + + def run(args, expected): + output = StringIO.StringIO() + sys.stdout = output + ctx_proxy.client.main(args) + assert output.getvalue() == expected + + try: + run([], ex_typed_output) + run(['-j'], ex_json_output) + run(['--json-output'], ex_json_output) + finally: + sys.stdout = current_stdout + + def mock_client_request(self, socket_url, args, timeout): + assert socket_url == self.expected.get('socket_url') + assert args == self.expected.get('args') + assert timeout == int(self.expected.get('timeout')) + return self.mock_response + + @pytest.fixture(autouse=True) + def patch_client_request(self, mocker): + mocker.patch.object(ctx_proxy.client, + ctx_proxy.client._client_request.__name__, + self.mock_client_request) + mocker.patch.dict('os.environ', {'CTX_SOCKET_URL': 'stub'}) + + @pytest.fixture(autouse=True) + def defaults(self): + self.expected = dict(args=[], timeout=30, socket_url='stub') + self.mock_response = None + + +class TestCtxEntryPoint(object): + + def test_ctx_in_path(self): + subprocess.check_output(['ctx', '--help']) + + +class TestPathDictAccess(object): + def test_simple_set(self): + obj = {} + path_dict = ctx_proxy.server._PathDictAccess(obj) + path_dict.set('foo', 42) + assert obj == {'foo': 42} + + def test_nested_set(self): + obj = {'foo': {}} + path_dict = ctx_proxy.server._PathDictAccess(obj) + path_dict.set('foo.bar', 42) + assert obj == {'foo': {'bar': 42}} + + def test_set_index(self): + obj = {'foo': [None, {'bar': 0}]} + path_dict = ctx_proxy.server._PathDictAccess(obj) + path_dict.set('foo[1].bar', 42) + assert obj == {'foo': [None, {'bar': 42}]} + + def test_set_nonexistent_parent(self): + obj = {} + path_dict = ctx_proxy.server._PathDictAccess(obj) + path_dict.set('foo.bar', 42) + assert obj == {'foo': {'bar': 42}} + + def test_set_nonexistent_parent_nested(self): + obj = {} + path_dict = ctx_proxy.server._PathDictAccess(obj) + path_dict.set('foo.bar.baz', 42) + assert obj == {'foo': {'bar': {'baz': 42}}} + + def test_simple_get(self): + obj = {'foo': 42} + path_dict = ctx_proxy.server._PathDictAccess(obj) + result = path_dict.get('foo') + assert result == 42 + + def test_nested_get(self): + obj = {'foo': {'bar': 42}} + path_dict = ctx_proxy.server._PathDictAccess(obj) + result = path_dict.get('foo.bar') + assert result == 42 + + def test_nested_get_shadows_dotted_name(self): + obj = {'foo': {'bar': 42}, 'foo.bar': 58} + path_dict = ctx_proxy.server._PathDictAccess(obj) + result = path_dict.get('foo.bar') + assert result == 42 + + def test_index_get(self): + obj = {'foo': [0, 1]} + path_dict = ctx_proxy.server._PathDictAccess(obj) + result = path_dict.get('foo[1]') + assert result == 1 + + def test_get_nonexistent(self): + obj = {} + path_dict = ctx_proxy.server._PathDictAccess(obj) + with pytest.raises(RuntimeError): + path_dict.get('foo') + + def test_get_by_index_not_list(self): + obj = {'foo': {0: 'not-list'}} + path_dict = ctx_proxy.server._PathDictAccess(obj) + with pytest.raises(RuntimeError): + path_dict.get('foo[0]') + + def test_get_by_index_nonexistent_parent(self): + obj = {} + path_dict = ctx_proxy.server._PathDictAccess(obj) + with pytest.raises(RuntimeError): + path_dict.get('foo[1]')