Give the connection and the reader/writer tasks nicknames, and add
logging statements throughout.

Signed-off-by: John Snow <js...@redhat.com>
---
 python/qemu/aqmp/protocol.py | 64 ++++++++++++++++++++++++++++++++----
 python/qemu/aqmp/util.py     | 32 ++++++++++++++++++
 2 files changed, 90 insertions(+), 6 deletions(-)

diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index a99a191982..dd8564ee02 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -14,6 +14,7 @@
 from asyncio import StreamReader, StreamWriter
 from enum import Enum
 from functools import wraps
+import logging
 from ssl import SSLContext
 from typing import (
     Any,
@@ -34,6 +35,7 @@
     create_task,
     flush,
     is_closing,
+    pretty_traceback,
     upper_half,
     wait_closed,
     wait_task_done,
@@ -174,14 +176,28 @@ class AsyncProtocol(Generic[T]):
          can be written after the super() call.
      - `_on_message`:
          Actions to be performed when a message is received.
+
+    :param name:
+        Name used for logging messages, if any. By default, messages
+        will log to 'qemu.aqmp.protocol', but each individual connection
+        can be given its own logger by giving it a name; messages will
+        then log to 'qemu.aqmp.protocol.${name}'.
     """
     # pylint: disable=too-many-instance-attributes
 
+    #: Logger object for debugging messages from this connection.
+    logger = logging.getLogger(__name__)
+
     # -------------------------
     # Section: Public interface
     # -------------------------
 
-    def __init__(self) -> None:
+    def __init__(self, name: Optional[str] = None) -> None:
+        #: The nickname for this connection, if any.
+        self.name: Optional[str] = name
+        if self.name is not None:
+            self.logger = self.logger.getChild(self.name)
+
         # stream I/O
         self._reader: Optional[StreamReader] = None
         self._writer: Optional[StreamWriter] = None
@@ -212,6 +228,15 @@ def __init__(self) -> None:
         #: An `asyncio.Event` that signals when `runstate` is changed.
         self.runstate_changed: asyncio.Event = asyncio.Event()
 
+    def __repr__(self) -> str:
+        argstr = ''
+        if self.name is not None:
+            argstr += f"name={self.name}"
+        return "{:s}({:s})".format(
+            type(self).__name__,
+            argstr,
+        )
+
     @property
     def runstate(self) -> Runstate:
         """The current `Runstate` of the connection."""
@@ -301,6 +326,8 @@ async def _new_session(self,
         assert self.runstate == Runstate.IDLE
         self._set_state(Runstate.CONNECTING)
 
+        if not self._outgoing.empty():
+            self.logger.warning("Outgoing message queue was not empty!")
         self._outgoing = asyncio.Queue()
 
         phase = "connection"
@@ -311,9 +338,15 @@ async def _new_session(self,
             await self._begin_new_session()
 
         except Exception as err:
-            # Reset from CONNECTING back to IDLE.
-            await self.disconnect()
             emsg = f"Failed to establish {phase}"
+            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+            try:
+                # Reset from CONNECTING back to IDLE.
+                await self.disconnect()
+            except:
+                emsg = "Unexpected bottom half exceptions"
+                self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+                raise
             raise ConnectError(emsg, err) from err
 
         assert self.runstate == Runstate.RUNNING
@@ -330,12 +363,16 @@ async def _do_connect(self, address: Union[str, 
Tuple[str, int]],
 
         :raise OSError: For stream-related errors.
         """
+        self.logger.debug("Connecting ...")
+
         if isinstance(address, tuple):
             connect = asyncio.open_connection(address[0], address[1], ssl=ssl)
         else:
             connect = asyncio.open_unix_connection(path=address, ssl=ssl)
         self._reader, self._writer = await connect
 
+        self.logger.debug("Connected.")
+
     @upper_half
     async def _begin_new_session(self) -> None:
         """
@@ -343,8 +380,8 @@ async def _begin_new_session(self) -> None:
         """
         assert self.runstate == Runstate.CONNECTING
 
-        reader_coro = self._bh_loop_forever(self._bh_recv_message)
-        writer_coro = self._bh_loop_forever(self._bh_send_message)
+        reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
+        writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
 
         self._reader_task = create_task(reader_coro)
         self._writer_task = create_task(writer_coro)
@@ -374,6 +411,7 @@ def _schedule_disconnect(self, force: bool = False) -> None:
             terminating execution. When `True`, terminate immediately.
         """
         if not self._dc_task:
+            self.logger.debug("scheduling disconnect.")
             self._dc_task = create_task(self._bh_disconnect(force))
 
     @upper_half
@@ -499,8 +537,13 @@ async def _bh_disconnect(self, force: bool = False) -> 
None:
         # This implicitly closes the reader, too.
         if self._writer:
             if not is_closing(self._writer):
+                self.logger.debug("Closing StreamWriter.")
                 self._writer.close()
+            self.logger.debug("Waiting for writer to close.")
             await wait_closed(self._writer)
+            self.logger.debug("Writer closed.")
+
+        self.logger.debug("Disconnected.")
 
     @bottom_half
     async def _bh_stop_writer(self, force: bool = False) -> None:
@@ -513,17 +556,19 @@ async def _bh_stop_writer(self, force: bool = False) -> 
None:
 
         # Cancel the writer task.
         if self._writer_task and not self._writer_task.done():
+            self.logger.debug("Cancelling writer task.")
             self._writer_task.cancel()
         await wait_task_done(self._writer_task)
 
     @bottom_half
     async def _bh_stop_reader(self) -> None:
         if self._reader_task and not self._reader_task.done():
+            self.logger.debug("Cancelling reader task.")
             self._reader_task.cancel()
         await wait_task_done(self._reader_task)
 
     @bottom_half
-    async def _bh_loop_forever(self, async_fn: _TaskFN) -> None:
+    async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
         """
         Run one of the bottom-half methods in a loop forever.
 
@@ -531,16 +576,23 @@ async def _bh_loop_forever(self, async_fn: _TaskFN) -> 
None:
         disconnect that will terminate the entire loop.
 
         :param async_fn: The bottom-half method to run in a loop.
+        :param name: The name of this task, used for logging.
         """
         try:
             while True:
                 await async_fn()
         except asyncio.CancelledError:
             # We have been cancelled by _bh_disconnect, exit gracefully.
+            self.logger.debug("Task.%s: cancelled.", name)
             return
         except BaseException:
+            self.logger.error(
+                "Task.%s: failure:\n%s\n", name, pretty_traceback()
+            )
             self._schedule_disconnect(force=True)
             raise
+        finally:
+            self.logger.debug("Task.%s: exiting.", name)
 
     @bottom_half
     async def _bh_send_message(self) -> None:
diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py
index 9ea91f2862..2311be5893 100644
--- a/python/qemu/aqmp/util.py
+++ b/python/qemu/aqmp/util.py
@@ -3,10 +3,14 @@
 
 This module primarily provides compatibility wrappers for Python 3.6 to
 provide some features that otherwise become available in Python 3.7+.
+
+It additionally provides `pretty_traceback()`, used for formatting
+tracebacks for inclusion in the logging stream.
 """
 
 import asyncio
 import sys
+import traceback
 from typing import (
     Any,
     Coroutine,
@@ -105,6 +109,34 @@ async def wait_task_done(task: 
Optional['asyncio.Future[Any]']) -> None:
             break
 
 
+def pretty_traceback(prefix: str = "  | ") -> str:
+    """
+    Formats the current traceback, indented to provide visual distinction.
+
+    This is useful for printing a traceback within a traceback for
+    debugging purposes when encapsulating errors to deliver them up the
+    stack; when those errors are printed, this helps provide a nice
+    visual grouping to quickly identify the parts of the error that
+    belong to the inner exception.
+
+    :param prefix: The prefix to append to each line of the traceback.
+    :return: A string, formatted something like the following::
+
+      | Traceback (most recent call last):
+      |   File "foobar.py", line 42, in arbitrary_example
+      |     foo.baz()
+      | ArbitraryError: [Errno 42] Something bad happened!
+    """
+    output = "".join(traceback.format_exception(*sys.exc_info()))
+
+    exc_lines = []
+    for line in output.split('\n'):
+        exc_lines.append(prefix + line)
+
+    # The last line is always empty, omit it
+    return "\n".join(exc_lines[:-1])
+
+
 def upper_half(func: T) -> T:
     """
     Do-nothing decorator that annotates a method as an "upper-half" method.
-- 
2.31.1


Reply via email to