Add the ability to handle and route messages in qmp_protocol.py. The interface for actually sending anything still isn't added until next commit.
Signed-off-by: John Snow <js...@redhat.com> --- python/qemu/aqmp/qmp_protocol.py | 98 +++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 2 deletions(-) diff --git a/python/qemu/aqmp/qmp_protocol.py b/python/qemu/aqmp/qmp_protocol.py index 5872bfc017..04c8a8cb54 100644 --- a/python/qemu/aqmp/qmp_protocol.py +++ b/python/qemu/aqmp/qmp_protocol.py @@ -7,15 +7,18 @@ incoming connection from that server. """ +# The import workarounds here are fixed in the next commit. +import asyncio # pylint: disable=unused-import # noqa import logging from typing import ( Dict, List, Mapping, Optional, + Union, ) -from .error import ProtocolError +from .error import AQMPError, ProtocolError from .events import Events from .message import Message from .models import Greeting @@ -56,6 +59,53 @@ class NegotiationError(_WrappedProtocolError): """ +class ExecInterruptedError(AQMPError): + """ + Exception raised when an RPC is interrupted. + + This error is raised when an execute() statement could not be + completed. This can occur because the connection itself was + terminated before a reply was received. + + The true cause of the interruption will be available via `disconnect()`. + """ + + +class _MsgProtocolError(ProtocolError): + """ + Abstract error class for protocol errors that have a `Message` object. + + This Exception class is used for protocol errors where the `Message` + was mechanically understood, but was found to be inappropriate or + malformed. + + :param error_message: Human-readable string describing the error. + :param msg: The QMP `Message` that caused the error. + """ + def __init__(self, error_message: str, msg: Message): + super().__init__(error_message) + #: The received `Message` that caused the error. + self.msg: Message = msg + + def __str__(self) -> str: + return "\n".join([ + super().__str__(), + f" Message was: {str(self.msg)}\n", + ]) + + +class ServerParseError(_MsgProtocolError): + """ + The Server sent a `Message` indicating parsing failure. + + i.e. A reply has arrived from the server, but it is missing the "ID" + field, indicating a parsing error. + + :param error_message: Human-readable string describing the error. + :param msg: The QMP `Message` that caused the error. + """ + + class QMP(AsyncProtocol[Message], Events): """ Implements a QMP client connection. @@ -98,6 +148,9 @@ async def run(self, address='/tmp/qemu.socket'): #: Logger object used for debugging messages. logger = logging.getLogger(__name__) + # Type alias for pending execute() result items + _PendingT = Union[Message, ExecInterruptedError] + def __init__(self, name: Optional[str] = None) -> None: super().__init__(name) Events.__init__(self) @@ -112,6 +165,9 @@ def __init__(self, name: Optional[str] = None) -> None: # Cached Greeting, if one was awaited. self._greeting: Optional[Greeting] = None + # Incoming RPC reply messages + self._pending: Dict[str, 'asyncio.Queue[QMP._PendingT]'] = {} + @upper_half async def _begin_new_session(self) -> None: """ @@ -191,10 +247,27 @@ async def _negotiate(self) -> None: self.logger.error("%s:\n%s\n", emsg, pretty_traceback()) raise + @bottom_half + async def _bh_disconnect(self, force: bool = False) -> None: + await super()._bh_disconnect(force) + + if self._pending: + self.logger.debug("Cancelling pending executions") + keys = self._pending.keys() + for key in keys: + self.logger.debug("Cancelling execution '%s'", key) + self._pending[key].put_nowait( + ExecInterruptedError("Disconnected") + ) + + self.logger.debug("QMP Disconnected.") + @bottom_half async def _on_message(self, msg: Message) -> None: """ Add an incoming message to the appropriate queue/handler. + + :raise ServerParseError: When Message has no 'event' nor 'id' member """ # Incoming messages are not fully parsed/validated here; # do only light peeking to know how to route the messages. @@ -204,7 +277,27 @@ async def _on_message(self, msg: Message) -> None: return # Below, we assume everything left is an execute/exec-oob response. - # ... Which we'll implement in the next commit! + + if 'id' not in msg: + # This is (very likely) a server parsing error. + # It doesn't inherently belong to any pending execution. + # Instead of performing clever recovery, just terminate. + # See "NOTE" in qmp-spec.txt, section 2.4.2 + raise ServerParseError("Server sent a message without an ID," + " indicating parse failure.", msg) + + assert 'id' in msg + exec_id = str(msg['id']) + + if exec_id not in self._pending: + # qmp-spec.txt, section 2.4: + # 'Clients should drop all the responses + # that have an unknown "id" field.' + self.logger.warning("Unknown ID '%s', message dropped.", exec_id) + self.logger.debug("Unroutable message: %s", str(msg)) + return + + await self._pending[exec_id].put(msg) @upper_half @bottom_half @@ -237,6 +330,7 @@ def _do_send(self, msg: Message) -> None: def _cleanup(self) -> None: super()._cleanup() self._greeting = None + assert not self._pending @classmethod def make_execute_msg(cls, cmd: str, -- 2.31.1