Give the connection and the reader/writer tasks nicknames, and add logging statements throughout.
Signed-off-by: John Snow <js...@redhat.com> --- python/qemu/aqmp/protocol.py | 64 ++++++++++++++++++++++++++++++++---- python/qemu/aqmp/util.py | 32 ++++++++++++++++++ 2 files changed, 90 insertions(+), 6 deletions(-) diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index a99a191982..dd8564ee02 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -14,6 +14,7 @@ from asyncio import StreamReader, StreamWriter from enum import Enum from functools import wraps +import logging from ssl import SSLContext from typing import ( Any, @@ -34,6 +35,7 @@ create_task, flush, is_closing, + pretty_traceback, upper_half, wait_closed, wait_task_done, @@ -174,14 +176,28 @@ class AsyncProtocol(Generic[T]): can be written after the super() call. - `_on_message`: Actions to be performed when a message is received. + + :param name: + Name used for logging messages, if any. By default, messages + will log to 'qemu.aqmp.protocol', but each individual connection + can be given its own logger by giving it a name; messages will + then log to 'qemu.aqmp.protocol.${name}'. """ # pylint: disable=too-many-instance-attributes + #: Logger object for debugging messages from this connection. + logger = logging.getLogger(__name__) + # ------------------------- # Section: Public interface # ------------------------- - def __init__(self) -> None: + def __init__(self, name: Optional[str] = None) -> None: + #: The nickname for this connection, if any. + self.name: Optional[str] = name + if self.name is not None: + self.logger = self.logger.getChild(self.name) + # stream I/O self._reader: Optional[StreamReader] = None self._writer: Optional[StreamWriter] = None @@ -212,6 +228,15 @@ def __init__(self) -> None: #: An `asyncio.Event` that signals when `runstate` is changed. self.runstate_changed: asyncio.Event = asyncio.Event() + def __repr__(self) -> str: + argstr = '' + if self.name is not None: + argstr += f"name={self.name}" + return "{:s}({:s})".format( + type(self).__name__, + argstr, + ) + @property def runstate(self) -> Runstate: """The current `Runstate` of the connection.""" @@ -301,6 +326,8 @@ async def _new_session(self, assert self.runstate == Runstate.IDLE self._set_state(Runstate.CONNECTING) + if not self._outgoing.empty(): + self.logger.warning("Outgoing message queue was not empty!") self._outgoing = asyncio.Queue() phase = "connection" @@ -311,9 +338,15 @@ async def _new_session(self, await self._begin_new_session() except Exception as err: - # Reset from CONNECTING back to IDLE. - await self.disconnect() emsg = f"Failed to establish {phase}" + self.logger.error("%s:\n%s\n", emsg, pretty_traceback()) + try: + # Reset from CONNECTING back to IDLE. + await self.disconnect() + except: + emsg = "Unexpected bottom half exceptions" + self.logger.error("%s:\n%s\n", emsg, pretty_traceback()) + raise raise ConnectError(emsg, err) from err assert self.runstate == Runstate.RUNNING @@ -330,12 +363,16 @@ async def _do_connect(self, address: Union[str, Tuple[str, int]], :raise OSError: For stream-related errors. """ + self.logger.debug("Connecting ...") + if isinstance(address, tuple): connect = asyncio.open_connection(address[0], address[1], ssl=ssl) else: connect = asyncio.open_unix_connection(path=address, ssl=ssl) self._reader, self._writer = await connect + self.logger.debug("Connected.") + @upper_half async def _begin_new_session(self) -> None: """ @@ -343,8 +380,8 @@ async def _begin_new_session(self) -> None: """ assert self.runstate == Runstate.CONNECTING - reader_coro = self._bh_loop_forever(self._bh_recv_message) - writer_coro = self._bh_loop_forever(self._bh_send_message) + reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader') + writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer') self._reader_task = create_task(reader_coro) self._writer_task = create_task(writer_coro) @@ -374,6 +411,7 @@ def _schedule_disconnect(self, force: bool = False) -> None: terminating execution. When `True`, terminate immediately. """ if not self._dc_task: + self.logger.debug("scheduling disconnect.") self._dc_task = create_task(self._bh_disconnect(force)) @upper_half @@ -499,8 +537,13 @@ async def _bh_disconnect(self, force: bool = False) -> None: # This implicitly closes the reader, too. if self._writer: if not is_closing(self._writer): + self.logger.debug("Closing StreamWriter.") self._writer.close() + self.logger.debug("Waiting for writer to close.") await wait_closed(self._writer) + self.logger.debug("Writer closed.") + + self.logger.debug("Disconnected.") @bottom_half async def _bh_stop_writer(self, force: bool = False) -> None: @@ -513,17 +556,19 @@ async def _bh_stop_writer(self, force: bool = False) -> None: # Cancel the writer task. if self._writer_task and not self._writer_task.done(): + self.logger.debug("Cancelling writer task.") self._writer_task.cancel() await wait_task_done(self._writer_task) @bottom_half async def _bh_stop_reader(self) -> None: if self._reader_task and not self._reader_task.done(): + self.logger.debug("Cancelling reader task.") self._reader_task.cancel() await wait_task_done(self._reader_task) @bottom_half - async def _bh_loop_forever(self, async_fn: _TaskFN) -> None: + async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None: """ Run one of the bottom-half methods in a loop forever. @@ -531,16 +576,23 @@ async def _bh_loop_forever(self, async_fn: _TaskFN) -> None: disconnect that will terminate the entire loop. :param async_fn: The bottom-half method to run in a loop. + :param name: The name of this task, used for logging. """ try: while True: await async_fn() except asyncio.CancelledError: # We have been cancelled by _bh_disconnect, exit gracefully. + self.logger.debug("Task.%s: cancelled.", name) return except BaseException: + self.logger.error( + "Task.%s: failure:\n%s\n", name, pretty_traceback() + ) self._schedule_disconnect(force=True) raise + finally: + self.logger.debug("Task.%s: exiting.", name) @bottom_half async def _bh_send_message(self) -> None: diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py index 9ea91f2862..2311be5893 100644 --- a/python/qemu/aqmp/util.py +++ b/python/qemu/aqmp/util.py @@ -3,10 +3,14 @@ This module primarily provides compatibility wrappers for Python 3.6 to provide some features that otherwise become available in Python 3.7+. + +It additionally provides `pretty_traceback()`, used for formatting +tracebacks for inclusion in the logging stream. """ import asyncio import sys +import traceback from typing import ( Any, Coroutine, @@ -105,6 +109,34 @@ async def wait_task_done(task: Optional['asyncio.Future[Any]']) -> None: break +def pretty_traceback(prefix: str = " | ") -> str: + """ + Formats the current traceback, indented to provide visual distinction. + + This is useful for printing a traceback within a traceback for + debugging purposes when encapsulating errors to deliver them up the + stack; when those errors are printed, this helps provide a nice + visual grouping to quickly identify the parts of the error that + belong to the inner exception. + + :param prefix: The prefix to append to each line of the traceback. + :return: A string, formatted something like the following:: + + | Traceback (most recent call last): + | File "foobar.py", line 42, in arbitrary_example + | foo.baz() + | ArbitraryError: [Errno 42] Something bad happened! + """ + output = "".join(traceback.format_exception(*sys.exc_info())) + + exc_lines = [] + for line in output.split('\n'): + exc_lines.append(prefix + line) + + # The last line is always empty, omit it + return "\n".join(exc_lines[:-1]) + + def upper_half(func: T) -> T: """ Do-nothing decorator that annotates a method as an "upper-half" method. -- 2.31.1