This is an automated email from the ASF dual-hosted git repository.

klueska pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new cfa60e4  Added TaskIO to new CLI.
cfa60e4 is described below

commit cfa60e41e53ee1f5a647a7ecd26d0fedd24bc392
Author: Armand Grillet <armand.gril...@outlook.com>
AuthorDate: Wed Oct 10 06:35:13 2018 +0200

    Added TaskIO to new CLI.
    
    This code was pulled directly from:
    https://github.com/dcos/dcos-core-cli/blob/
        7fd55421939a7782c237e2b8719c0fe2f543acd7/
            python/lib/dcos/dcos/mesos.py
    
    It will be used by the new CLI for commands such as 'task exec'.
    Changes have been made to work with our HTTP Python library and
    make the code pass when linted by Pylint.
    
    Review: https://reviews.apache.org/r/68978
---
 src/python/cli_new/lib/cli/mesos.py | 693 ++++++++++++++++++++++++++++++++++++
 1 file changed, 693 insertions(+)

diff --git a/src/python/cli_new/lib/cli/mesos.py 
b/src/python/cli_new/lib/cli/mesos.py
index cab02f6..4a5777b 100644
--- a/src/python/cli_new/lib/cli/mesos.py
+++ b/src/python/cli_new/lib/cli/mesos.py
@@ -18,9 +18,31 @@
 Functions to handle agents.
 """
 
+import base64
+import json
+import os
+import platform
+import signal
+import sys
+import threading
+import time
+import tty
+import uuid
+
+from functools import partial
+from queue import Queue
+
+import termios
+
 from cli import http
+from cli import util
 from cli.exceptions import CLIException
 
+import mesos.http
+from mesos import recordio
+from mesos.exceptions import MesosException
+from mesos.exceptions import MesosHTTPException
+
 
 def get_agent_address(agent_id, master):
     """
@@ -113,3 +135,674 @@ def get_tasks(master):
             .format(key=key, endpoint=endpoint))
 
     return data[key]
+
+class TaskIO():
+    """
+    Object used to stream I/O between a
+    running Mesos task and the local terminal.
+
+    :param task: task ID
+    :type task: str
+    :param cmd: a command to launch inside the task's container
+    :type cmd: str
+    :param args: Additional arguments for the command
+    :type args: str
+    :param interactive: whether to attach STDIN of the current
+                        terminal to the new command being launched
+    :type interactive: bool
+    :param tty: whether to allocate a tty for this command and attach
+                the local terminal to it
+    :type tty: bool
+    """
+    # pylint: disable=too-many-instance-attributes
+
+    # The interval to send heartbeat messages to
+    # keep persistent connections alive.
+    HEARTBEAT_INTERVAL = 30
+    HEARTBEAT_INTERVAL_NANOSECONDS = HEARTBEAT_INTERVAL * 1000000000
+
+    def __init__(self, master, task_id):
+        # Get the task and make sure its container was launched by the UCR.
+        # Since task's containers are launched by the UCR by default, we want
+        # to allow most tasks to pass through unchecked. The only exception is
+        # when a task has an explicit container specified and it is not of type
+        # "MESOS". Having a type of "MESOS" implies that it was launched by the
+        # UCR -- all other types imply it was not.
+        try:
+            tasks = get_tasks(master)
+        except Exception as exception:
+            raise CLIException("Unable to get tasks from leading"
+                               " master '{master}': {error}"
+                               .format(master=master, error=exception))
+
+        running_tasks = [t for t in tasks if t["state"] == "TASK_RUNNING"]
+        matching_tasks = [t for t in running_tasks if t["id"] == task_id]
+
+        if not matching_tasks:
+            raise CLIException("Unable to find running task '{task_id}'"
+                               " from leading master '{master}'"
+                               .format(task_id=task_id, master=master))
+
+        if len(matching_tasks) > 1:
+            raise CLIException("More than one task matching id '{id}'"
+                               .format(id=task_id))
+
+
+        task_obj = matching_tasks[0]
+
+        if "container" in task_obj:
+            if "type" in task_obj["container"]:
+                if task_obj["container"]["type"] != "MESOS":
+                    raise CLIException(
+                        "This command is only supported for tasks"
+                        " launched by the Universal Container Runtime (UCR).")
+
+        # Get the URL to the agent running the task.
+        agent_addr = util.sanitize_address(
+            get_agent_address(task_obj["slave_id"], master))
+        self.agent_url = mesos.http.simple_urljoin(agent_addr, "api/v1")
+
+        # Get the agent's task path by checking the `state` endpoint.
+        try:
+            self.container_id = get_container_id(task_obj)
+        except CLIException as exception:
+            raise CLIException("Could not get container ID of task '{id}'"
+                               " from agent '{addr}': {error}"
+                               .format(id=task_id, addr=agent_addr,
+                                       error=exception))
+
+        # Set up a recordio encoder and decoder
+        # for any incoming and outgoing messages.
+        self.encoder = recordio.Encoder(
+            lambda s: bytes(json.dumps(s, ensure_ascii=False), "UTF-8"))
+        self.decoder = recordio.Decoder(
+            lambda s: json.loads(s.decode("UTF-8")))
+
+        # Set up queues to send messages between threads used for
+        # reading/writing to STDIN/STDOUT/STDERR and threads
+        # sending/receiving data over the network.
+        self.input_queue = Queue()
+        self.output_queue = Queue()
+
+        # Set up an event to block attaching
+        # input until attaching output is complete.
+        self.attach_input_event = threading.Event()
+        self.attach_input_event.clear()
+
+        # Set up an event to block printing the output
+        # until an attach input event has successfully
+        # been established.
+        self.print_output_event = threading.Event()
+        self.print_output_event.clear()
+
+        # Set up an event to block the main thread
+        # from exiting until signaled to do so.
+        self.exit_event = threading.Event()
+        self.exit_event.clear()
+
+        # Use a class variable to store exceptions thrown on
+        # other threads and raise them on the main thread before
+        # exiting.
+        self.exception = None
+
+        # Default values for the TaskIO.
+        self.cmd = None
+        self.args = None
+        self.interactive = False
+        self.tty = False
+        self.output_thread_entry_point = None
+
+        # Allow an exit sequence to be used to break the CLIs attachment to
+        # the remote task. Depending on the call, this may be disabled, or
+        # the exit sequence to be used may be overwritten.
+        self.supports_exit_sequence = True
+        self.exit_sequence = b'\x10\x11'  # Ctrl-p, Ctrl-q
+        self.exit_sequence_detected = False
+
+    def attach(self, _no_stdin=False):
+        """
+        Attach the stdin/stdout/stderr of the CLI to the
+        STDIN/STDOUT/STDERR of a running task.
+
+        As of now, we can only attach to tasks launched with a remote TTY
+        already set up for them. If we try to attach to a task that was
+        launched without a remote TTY attached, this command will fail.
+
+        :param task: task ID pattern to match
+        :type task: str
+        :param no_stdin: True if we should *not* attach stdin,
+                         False if we should
+        :type no_stdin: bool
+        """
+
+        # Store relevant parameters of the call for later.
+        self.interactive = not _no_stdin
+        self.tty = True
+
+        # Set the entry point of the output thread to be a call to
+        # _attach_container_output.
+        self.output_thread_entry_point = self._attach_container_output
+
+        self._run()
+
+    def exec(self, _cmd, _args=None, _interactive=False, _tty=False):
+        """
+        Execute a new process inside of a given task by redirecting
+        STDIN/STDOUT/STDERR between the CLI and the Mesos Agent API.
+
+        If a tty is requested, we take over the current terminal and
+        put it into raw mode. We make sure to reset the terminal back
+        to its original settings before exiting.
+
+        :param cmd: The command to launch inside the task's container
+        :type args: cmd
+        :param args: Additional arguments for the command
+        :type args: list
+        :param interactive: attach stdin
+        :type interactive: bool
+        :param tty: attach a tty
+        :type tty: bool
+        """
+
+        # Store relevant parameters of the call for later.
+        self.cmd = _cmd
+        self.args = _args
+        self.interactive = _interactive
+        self.tty = _tty
+
+        # Override the container ID with the current container ID as the
+        # parent, and generate a new UUID for the nested container used to
+        # run commands passed to `task exec`.
+        self.container_id = {
+            'parent': self.container_id,
+            'value': str(uuid.uuid4())
+        }
+
+        # Set the entry point of the output thread to be a call to
+        # _launch_nested_container_session.
+        self.output_thread_entry_point = self._launch_nested_container_session
+
+        self._run()
+
+    def _run(self):
+        """
+        Run the helper threads in this class which enable streaming
+        of STDIN/STDOUT/STDERR between the CLI and the Mesos Agent API.
+
+        If a tty is requested, we take over the current terminal and
+        put it into raw mode. We make sure to reset the terminal back
+        to its original settings before exiting.
+        """
+
+        # Without a TTY.
+        if not self.tty:
+            try:
+                self._start_threads()
+                self.exit_event.wait()
+            except Exception as e:
+                self.exception = e
+
+            if self.exception:
+                # Known Pylint issue: 
https://github.com/PyCQA/pylint/issues/157
+                # pylint: disable=raising-bad-type
+                raise self.exception
+            return
+
+        # With a TTY.
+        if platform.system() == "Windows":
+            raise CLIException(
+                "Running with the '--tty' flag is not supported on windows")
+
+        if not sys.stdin.isatty():
+            raise CLIException(
+                "Must be running in a tty to pass the '--tty flag'")
+
+        fd = sys.stdin.fileno()
+        oldtermios = termios.tcgetattr(fd)
+
+        try:
+            if self.interactive:
+                tty.setraw(fd, when=termios.TCSANOW)
+                # To force a redraw of the remote terminal, we first resize it
+                # to 0 before setting it to the actual size of our local
+                # terminal. After that, all terminal resizing is handled in our
+                # SIGWINCH handler.
+                self._window_resize(signal.SIGWINCH, dimensions=[0, 0])
+                self._window_resize(signal.SIGWINCH)
+                signal.signal(signal.SIGWINCH, self._window_resize)
+
+            self._start_threads()
+            self.exit_event.wait()
+        except Exception as e:
+            self.exception = e
+
+        termios.tcsetattr(
+            sys.stdin.fileno(),
+            termios.TCSAFLUSH,
+            oldtermios)
+
+        if self.exception:
+            # Known Pylint issue: https://github.com/PyCQA/pylint/issues/157
+            # pylint: disable=raising-bad-type
+            raise self.exception
+
+    def _thread_wrapper(self, func):
+        """
+        A wrapper around all threads used in this class.
+
+        If a thread throws an exception, it will unblock the main
+        thread and save the exception in a class variable. The main
+        thread will then rethrow the exception before exiting.
+
+        :param func: The start function for the thread
+        :type func: function
+        """
+        try:
+            func()
+        except Exception as e:
+            self.exception = e
+            self.exit_event.set()
+
+    def _start_threads(self):
+        """
+        Start all threads associated with this class.
+        """
+        if self.interactive:
+            # Collects input from STDIN and puts
+            # it in the input_queue as data messages.
+            thread = threading.Thread(
+                target=self._thread_wrapper,
+                args=(self._input_thread,))
+            thread.daemon = True
+            thread.start()
+
+            # Prepares heartbeat control messages and
+            # puts them in the input queueaat a specific
+            # heartbeat interval.
+            thread = threading.Thread(
+                target=self._thread_wrapper,
+                args=(self._heartbeat_thread,))
+            thread.daemon = True
+            thread.start()
+
+            # Opens a persistent connection with the mesos agent and
+            # feeds it both control and data messages from the input
+            # queue via ATTACH_CONTAINER_INPUT messages.
+            thread = threading.Thread(
+                target=self._thread_wrapper,
+                args=(self._attach_container_input,))
+            thread.daemon = True
+            thread.start()
+
+        # Opens a persistent connection with a mesos agent, reads
+        # data messages from it and feeds them to an output_queue.
+        thread = threading.Thread(
+            target=self._thread_wrapper,
+            args=(self.output_thread_entry_point,))
+        thread.daemon = True
+        thread.start()
+
+        # Collects data messages from the output queue and writes
+        # their content to STDOUT and STDERR.
+        thread = threading.Thread(
+            target=self._thread_wrapper,
+            args=(self._output_thread,))
+        thread.daemon = True
+        thread.start()
+
+    def _attach_container_output(self):
+        """
+        Streams all output data (e.g. STDOUT/STDERR) to the
+        client from the agent.
+        """
+
+        message = {
+            'type': 'ATTACH_CONTAINER_OUTPUT',
+            'attach_container_output': {
+                'container_id': self.container_id}}
+
+        req_extra_args = {
+            'stream': True,
+            'additional_headers': {
+                'Content-Type': 'application/json',
+                'Accept': 'application/recordio',
+                'Message-Accept': 'application/json'}}
+
+        try:
+            resource = mesos.http.Resource(self.agent_url)
+            response = resource.request(
+                mesos.http.METHOD_POST,
+                data=json.dumps(message),
+                retry=False,
+                timeout=None,
+                **req_extra_args)
+        except MesosHTTPException as e:
+            text = "I/O switchboard server was disabled for this container"
+            if e.response.status_code == 500 and e.response.text == text:
+                raise CLIException("Unable to attach to a task"
+                                   " launched without a TTY")
+            raise e
+
+        self._process_output_stream(response)
+
+    def _launch_nested_container_session(self):
+        """
+        Sends a request to the Mesos Agent to launch a new
+        nested container and attach to its output stream.
+        The output stream is then sent back in the response.
+        """
+        message = {
+            'type': "LAUNCH_NESTED_CONTAINER_SESSION",
+            'launch_nested_container_session': {
+                'container_id': self.container_id,
+                'command': {
+                    'value': self.cmd,
+                    'arguments': [self.cmd] + self.args,
+                    'shell': False}}}
+
+        if self.tty:
+            message[
+                'launch_nested_container_session'][
+                    'container'] = {
+                        'type': 'MESOS',
+                        'tty_info': {}}
+
+        req_extra_args = {
+            'stream': True,
+            'additional_headers': {
+                'Content-Type': 'application/json',
+                'Accept': 'application/recordio',
+                'Message-Accept': 'application/json'}}
+
+        resource = mesos.http.Resource(self.agent_url)
+        try:
+            response = resource.request(
+                mesos.http.METHOD_POST,
+                data=json.dumps(message),
+                retry=False,
+                timeout=None,
+                **req_extra_args)
+        except MesosException as exception:
+            raise CLIException("{error}".format(error=exception))
+
+        self._process_output_stream(response)
+
+    def _process_output_stream(self, response):
+        """
+        Gets data streamed over the given response and places the
+        returned messages into our output_queue. Only expects to
+        receive data messages.
+
+        :param response: Response from an http post
+        :type response: requests.models.Response
+        """
+
+        # Now that we are ready to process the output stream (meaning
+        # our output connection has been established), allow the input
+        # stream to be attached by setting an event.
+        self.attach_input_event.set()
+
+        # If we are running in interactive mode, wait to make sure that
+        # our input connection succeeds before pushing any output to the
+        # output queue.
+        if self.interactive:
+            self.print_output_event.wait()
+
+        try:
+            for chunk in response.iter_content(chunk_size=None):
+                records = self.decoder.decode(chunk)
+
+                for r in records:
+                    if r.get('type') and r['type'] == 'DATA':
+                        self.output_queue.put(r['data'])
+        except Exception as e:
+            raise CLIException(
+                "Error parsing output stream: {error}".format(error=e))
+
+        self.output_queue.join()
+        self.exit_event.set()
+
+    def _attach_container_input(self):
+        """
+        Streams all input data (e.g. STDIN) from the client to the agent.
+        """
+
+        def _initial_input_streamer():
+            """
+            Generator function yielding the initial ATTACH_CONTAINER_INPUT
+            message for streaming. We have a separate generator for this so
+            that we can attempt the connection once before committing to a
+            persistent connection where we stream the rest of the input.
+
+            :returns: A RecordIO encoded message
+            """
+
+            message = {
+                'type': 'ATTACH_CONTAINER_INPUT',
+                'attach_container_input': {
+                    'type': 'CONTAINER_ID',
+                    'container_id': self.container_id}}
+
+            yield self.encoder.encode(message)
+
+        def _input_streamer():
+            """
+            Generator function yielding ATTACH_CONTAINER_INPUT messages for
+            streaming. It yields the _intitial_input_streamer() message,
+            followed by messages from the input_queue on each subsequent call.
+
+            :returns: A RecordIO encoded message
+            """
+            yield next(_initial_input_streamer(), None)
+
+            while True:
+                record = self.input_queue.get()
+                if not record:
+                    if self.exit_sequence_detected:
+                        sys.stdout.write("\r\n")
+                        sys.stdout.flush()
+                        self.exit_event.set()
+                    break
+                yield record
+
+        req_extra_args = {
+            'additional_headers': {
+                'Content-Type': 'application/recordio',
+                'Message-Content-Type': 'application/json',
+                'Accept': 'application/json',
+                'Connection': 'close',
+                'Transfer-Encoding': 'chunked'
+            }
+        }
+
+        # Ensure we don't try to attach our input to a container that isn't
+        # fully up and running by waiting until the
+        # `_process_output_stream` function signals us that it's ready.
+        self.attach_input_event.wait()
+
+        # Send an intial "Test" message to ensure that we are able to
+        # establish a connection with the agent. If we aren't we will throw
+        # an exception and break out of this thread. However, in cases where
+        # we receive a 500 response from the agent, we actually want to
+        # continue without throwing an exception. A 500 error indicates that
+        # we can't connect to the container because it has already finished
+        # running. In that case we continue running to allow the output queue
+        # to be flushed.
+        resource = mesos.http.Resource(self.agent_url)
+        try:
+            resource.request(
+                mesos.http.METHOD_POST,
+                data=_initial_input_streamer(),
+                retry=False,
+                **req_extra_args)
+        except MesosHTTPException as e:
+            if not e.response.status_code == 500:
+                raise e
+
+        # If we succeeded with that connection, unblock process_output_stream()
+        # from sending output data to the output thread.
+        self.print_output_event.set()
+
+        # Begin streaming the input.
+        resource = mesos.http.Resource(self.agent_url)
+        resource.request(
+            mesos.http.METHOD_POST,
+            data=_input_streamer(),
+            retry=False,
+            timeout=None,
+            **req_extra_args)
+
+    def _detect_exit_sequence(self, chunk):
+        """
+        Detects if 'self.exit_sequence' is present in 'chunk'.
+
+        If a partial exit sequence is detected at the end of 'chunk', then
+        more characters are read from 'stdin' and appended to 'chunk' in
+        search of the full sequence. Since python cannot pass variables by
+        reference, we return a modified 'chunk' with the extra characters
+        read if necessary.
+
+        If the exit sequence is found, the class variable
+        'exit_sequence_detected' is set to True.
+
+        :param chunk: a byte array to search for the exit sequence in
+        :type chunk: byte array
+        :returns: a modified byte array containing the original 'chunk' plus
+                  any extra characters read in search of the exit sequence
+        :rtype: byte array
+        """
+        if not self.supports_exit_sequence:
+            return chunk
+
+        if chunk.find(self.exit_sequence) != -1:
+            self.exit_sequence_detected = True
+            return chunk
+
+        for i in reversed(range(1, len(self.exit_sequence))):
+            if self.exit_sequence[:-i] == chunk[len(chunk)-i:]:
+                chunk += os.read(sys.stdin.fileno(), 1)
+                return self._detect_exit_sequence(chunk)
+
+        return chunk
+
+    def _input_thread(self):
+        """
+        Reads from STDIN and places a message
+        with that data onto the input_queue.
+        """
+
+        message = {
+            'type': 'ATTACH_CONTAINER_INPUT',
+            'attach_container_input': {
+                'type': 'PROCESS_IO',
+                'process_io': {
+                    'type': 'DATA',
+                    'data': {
+                        'type': 'STDIN',
+                        'data': ''}}}}
+
+        for chunk in iter(partial(os.read, sys.stdin.fileno(), 1024), b''):
+            chunk = self._detect_exit_sequence(chunk)
+            if self.exit_sequence_detected:
+                break
+
+            message[
+                'attach_container_input'][
+                    'process_io'][
+                        'data'][
+                            'data'] = base64.b64encode(chunk).decode('utf-8')
+
+            self.input_queue.put(self.encoder.encode(message))
+
+        # Push an empty string to indicate EOF to the server and push
+        # 'None' to signal that we are done processing input.
+        message['attach_container_input']['process_io']['data']['data'] = ''
+        self.input_queue.put(self.encoder.encode(message))
+        self.input_queue.put(None)
+
+    def _output_thread(self):
+        """
+        Reads from the output_queue and writes the data
+        to the appropriate STDOUT or STDERR.
+        """
+
+        while True:
+            # Get a message from the output queue and decode it.
+            # Then write the data to the appropriate stdout or stderr.
+            output = self.output_queue.get()
+            if not output.get('data'):
+                raise CLIException("Error no 'data' field in output message")
+
+            data = output['data']
+            data = base64.b64decode(data.encode('utf-8'))
+
+            if output.get('type') and output['type'] == 'STDOUT':
+                sys.stdout.buffer.write(data)
+                sys.stdout.flush()
+            elif output.get('type') and output['type'] == 'STDERR':
+                sys.stderr.buffer.write(data)
+                sys.stderr.flush()
+            else:
+                raise CLIException("Unsupported data type in output stream")
+
+            self.output_queue.task_done()
+
+    def _heartbeat_thread(self):
+        """
+        Generates a heartbeat message to send over the ATTACH_CONTAINER_INPUT
+        stream every `interval` seconds and inserts it in the input queue.
+        """
+
+        interval = self.HEARTBEAT_INTERVAL
+        nanoseconds = self.HEARTBEAT_INTERVAL_NANOSECONDS
+
+        message = {
+            'type': 'ATTACH_CONTAINER_INPUT',
+            'attach_container_input': {
+                'type': 'PROCESS_IO',
+                'process_io': {
+                    'type': 'CONTROL',
+                    'control': {
+                        'type': 'HEARTBEAT',
+                        'heartbeat': {
+                            'interval': {
+                                'nanoseconds': nanoseconds}}}}}}
+
+        while True:
+            self.input_queue.put(self.encoder.encode(message))
+            time.sleep(interval)
+
+    def _window_resize(self, signum=None, frame=None, dimensions=None):
+        # pylint: disable=unused-argument
+        """
+        Signal handler for SIGWINCH.
+
+        Generates a message with the current dimensions of the
+        terminal and puts it in the input_queue.
+
+        :param signum: the signal number being handled
+        :type signum: int
+        :param frame: current stack frame
+        :type frame: frame
+        """
+
+        # Determine the size of our terminal, and create the message to be sent
+        if dimensions:
+            rows, columns = dimensions
+        else:
+            rows, columns = os.popen('stty size', 'r').read().split()
+
+        message = {
+            'type': 'ATTACH_CONTAINER_INPUT',
+            'attach_container_input': {
+                'type': 'PROCESS_IO',
+                'process_io': {
+                    'type': 'CONTROL',
+                    'control': {
+                        'type': 'TTY_INFO',
+                        'tty_info': {
+                            'window_size': {
+                                'rows': int(rows),
+                                'columns': int(columns)}}}}}}
+
+        self.input_queue.put(self.encoder.encode(message))

Reply via email to