Author: Pavel Labath Date: 2020-12-07T09:24:13+01:00 New Revision: a2f4f7daf76c767efd668390bc7f8b99bdb1218c
URL: https://github.com/llvm/llvm-project/commit/a2f4f7daf76c767efd668390bc7f8b99bdb1218c DIFF: https://github.com/llvm/llvm-project/commit/a2f4f7daf76c767efd668390bc7f8b99bdb1218c.diff LOG: [lldb/test] Refactor socket_packet_pump Now that the class does not use a thread, the name is no longer appropriate. Rename the class to "Server" and make it a long-lived object (instead of recreating it for every expect_gdbremote_sequence call). The idea is to make this class a wrapper for all communication with debug/lldb-server. This will enable some additional cleanups as we had some duplication between socket_pump non-pump code paths. Also squeeze in some small improvements: - use python-level timeouts on sockets instead of the manual select calls - use byte arrays instead of strings when working with raw packets Added: Modified: lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py Removed: lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py ################################################################################ diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py index 2f278289988c..d7bfb7fbda32 100644 --- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py +++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py @@ -87,7 +87,6 @@ def setUp(self): self.setUpBaseLogging() self.debug_monitor_extra_args = [] - self._pump_queues = socket_packet_pump.PumpQueues() if self.isVerboseLoggingRequested(): # If requested, full logs go to a log file @@ -118,8 +117,6 @@ def setUp(self): self.stub_hostname = "localhost" def tearDown(self): - self._pump_queues.verify_queues_empty() - self.logger.removeHandler(self._verbose_log_handler) self._verbose_log_handler = None TestBase.tearDown(self) @@ -342,6 +339,7 @@ def launch_debug_monitor(self, attach_pid=None, logfile=None): if self.reverse_connect: self.sock = sock.accept()[0] + self.sock.settimeout(self.DEFAULT_TIMEOUT) return server @@ -354,6 +352,7 @@ def connect_to_debug_monitor(self, attach_pid=None): # Schedule debug monitor to be shut down during teardown. logger = self.logger + self._server = Server(self.sock, server) return server # We're using a random port algorithm to try not to collide with other ports, @@ -375,6 +374,7 @@ def connect_to_debug_monitor(self, attach_pid=None): try: logger.info("Connect attempt %d", connect_attemps + 1) self.sock = self.create_socket() + self._server = Server(self.sock, server) return server except _ConnectionRefused as serr: # Ignore, and try again. @@ -632,9 +632,8 @@ def parse_register_info_packets(self, context): def expect_gdbremote_sequence(self): return expect_lldb_gdbserver_replay( self, - self.sock, + self._server, self.test_sequence, - self._pump_queues, self.DEFAULT_TIMEOUT * len(self.test_sequence), self.logger) diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py index b5c635a77b5c..07136108b2a4 100644 --- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py +++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py @@ -3,19 +3,18 @@ from __future__ import division, print_function - +import binascii import os import os.path import platform import re import six -import socket_packet_pump +import socket import subprocess +from lldbsuite.support import seven from lldbsuite.test.lldbtest import * from lldbsuite.test import configuration - -from six.moves import queue - +from textwrap import dedent def _get_debug_monitor_from_lldb(lldb_exe, debug_monitor_basename): """Return the debug monitor exe path given the lldb exe path. @@ -165,9 +164,8 @@ def assert_packets_equal(asserter, actual_packet, expected_packet): def expect_lldb_gdbserver_replay( asserter, - sock, + server, test_sequence, - pump_queues, timeout_seconds, logger=None): """Replay socket communication with lldb-gdbserver and verify responses. @@ -175,8 +173,6 @@ def expect_lldb_gdbserver_replay( Args: asserter: the object providing assertEqual(first, second, msg=None), e.g. TestCase instance. - sock: the TCP socket connected to the lldb-gdbserver exe. - test_sequence: a GdbRemoteTestSequence instance that describes the messages sent to the gdb remote and the responses expected from it. @@ -207,75 +203,62 @@ def expect_lldb_gdbserver_replay( return {} context = {"O_count": 0, "O_content": ""} - with socket_packet_pump.SocketPacketPump(sock, pump_queues, logger) as pump: - # Grab the first sequence entry. - sequence_entry = test_sequence.entries.pop(0) - - # While we have an active sequence entry, send messages - # destined for the stub and collect/match/process responses - # expected from the stub. - while sequence_entry: - if sequence_entry.is_send_to_remote(): - # This is an entry to send to the remote debug monitor. - send_packet = sequence_entry.get_send_packet() - if logger: - if len(send_packet) == 1 and send_packet[0] == chr(3): - packet_desc = "^C" - else: - packet_desc = send_packet - logger.info( - "sending packet to remote: {}".format(packet_desc)) - sock.sendall(send_packet.encode()) - else: - # This is an entry expecting to receive content from the remote - # debug monitor. - # We'll pull from (and wait on) the queue appropriate for the type of matcher. - # We keep separate queues for process output (coming from non-deterministic - # $O packet division) and for all other packets. - if sequence_entry.is_output_matcher(): - try: - # Grab next entry from the output queue. - content = pump.get_output(timeout_seconds) - except queue.Empty: - if logger: - logger.warning( - "timeout waiting for stub output (accumulated output:{})".format( - pump.get_accumulated_output())) - raise Exception( - "timed out while waiting for output match (accumulated output: {})".format( - pump.get_accumulated_output())) + # Grab the first sequence entry. + sequence_entry = test_sequence.entries.pop(0) + + # While we have an active sequence entry, send messages + # destined for the stub and collect/match/process responses + # expected from the stub. + while sequence_entry: + if sequence_entry.is_send_to_remote(): + # This is an entry to send to the remote debug monitor. + send_packet = sequence_entry.get_send_packet() + if logger: + if len(send_packet) == 1 and send_packet[0] == chr(3): + packet_desc = "^C" else: - try: - content = pump.get_packet(timeout_seconds) - except queue.Empty: - if logger: - logger.warning( - "timeout waiting for packet match (receive buffer: {})".format( - pump.get_receive_buffer())) - raise Exception( - "timed out while waiting for packet match (receive buffer: {})".format( - pump.get_receive_buffer())) - - # Give the sequence entry the opportunity to match the content. - # Output matchers might match or pass after more output accumulates. - # Other packet types generally must match. - asserter.assertIsNotNone(content) - context = sequence_entry.assert_match( - asserter, content, context=context) - - # Move on to next sequence entry as needed. Some sequence entries support executing multiple - # times in diff erent states (for looping over query/response - # packets). - if sequence_entry.is_consumed(): - if len(test_sequence.entries) > 0: - sequence_entry = test_sequence.entries.pop(0) + packet_desc = send_packet + logger.info( + "sending packet to remote: {}".format(packet_desc)) + server.send_raw(send_packet.encode()) + else: + # This is an entry expecting to receive content from the remote + # debug monitor. + + # We'll pull from (and wait on) the queue appropriate for the type of matcher. + # We keep separate queues for process output (coming from non-deterministic + # $O packet division) and for all other packets. + try: + if sequence_entry.is_output_matcher(): + # Grab next entry from the output queue. + content = server.get_raw_output_packet() else: - sequence_entry = None + content = server.get_raw_normal_packet() + content = seven.bitcast_to_string(content) + except socket.timeout: + asserter.fail( + "timed out while waiting for '{}':\n{}".format(sequence_entry, server)) + + # Give the sequence entry the opportunity to match the content. + # Output matchers might match or pass after more output accumulates. + # Other packet types generally must match. + asserter.assertIsNotNone(content) + context = sequence_entry.assert_match( + asserter, content, context=context) + + # Move on to next sequence entry as needed. Some sequence entries support executing multiple + # times in diff erent states (for looping over query/response + # packets). + if sequence_entry.is_consumed(): + if len(test_sequence.entries) > 0: + sequence_entry = test_sequence.entries.pop(0) + else: + sequence_entry = None - # Fill in the O_content entries. - context["O_count"] = 1 - context["O_content"] = pump.get_accumulated_output() + # Fill in the O_content entries. + context["O_count"] = 1 + context["O_content"] = server.consume_accumulated_output() return context @@ -950,9 +933,99 @@ def process_is_running(pid, unknown_value=True): # Check if the pid is in the process_ids return pid in process_ids -if __name__ == '__main__': - EXE_PATH = get_lldb_server_exe() - if EXE_PATH: - print("lldb-server path detected: {}".format(EXE_PATH)) +def _handle_output_packet_string(packet_contents): + if (not packet_contents) or (len(packet_contents) < 1): + return None + elif packet_contents[0:1] != b"O": + return None + elif packet_contents == b"OK": + return None else: - print("lldb-server could not be found") + return binascii.unhexlify(packet_contents[1:]) + +class Server(object): + + _GDB_REMOTE_PACKET_REGEX = re.compile(br'^\$([^\#]*)#[0-9a-fA-F]{2}') + + class ChecksumMismatch(Exception): + pass + + def __init__(self, sock, proc = None): + self._accumulated_output = b"" + self._receive_buffer = b"" + self._normal_queue = [] + self._output_queue = [] + self._sock = sock + self._proc = proc + + def send_raw(self, frame): + self._sock.sendall(frame) + + def _read(self, q): + while not q: + new_bytes = self._sock.recv(4096) + self._process_new_bytes(new_bytes) + return q.pop(0) + + def _process_new_bytes(self, new_bytes): + # Add new bytes to our accumulated unprocessed packet bytes. + self._receive_buffer += new_bytes + + # Parse fully-formed packets into individual packets. + has_more = len(self._receive_buffer) > 0 + while has_more: + if len(self._receive_buffer) <= 0: + has_more = False + # handle '+' ack + elif self._receive_buffer[0:1] == b"+": + self._normal_queue += [b"+"] + self._receive_buffer = self._receive_buffer[1:] + else: + packet_match = self._GDB_REMOTE_PACKET_REGEX.match( + self._receive_buffer) + if packet_match: + # Our receive buffer matches a packet at the + # start of the receive buffer. + new_output_content = _handle_output_packet_string( + packet_match.group(1)) + if new_output_content: + # This was an $O packet with new content. + self._accumulated_output += new_output_content + self._output_queue += [self._accumulated_output] + else: + # Any packet other than $O. + self._normal_queue += [packet_match.group(0)] + + # Remove the parsed packet from the receive + # buffer. + self._receive_buffer = self._receive_buffer[ + len(packet_match.group(0)):] + else: + # We don't have enough in the receive bufferto make a full + # packet. Stop trying until we read more. + has_more = False + + def get_raw_output_packet(self): + return self._read(self._output_queue) + + def get_raw_normal_packet(self): + return self._read(self._normal_queue) + + def get_accumulated_output(self): + return self._accumulated_output + + def consume_accumulated_output(self): + output = self._accumulated_output + self._accumulated_output = b"" + return output + + def __str__(self): + return dedent("""\ + server '{}' on '{}' + _receive_buffer: {} + _normal_queue: {} + _output_queue: {} + _accumulated_output: {} + """).format(self._proc, self._sock, self._receive_buffer, + self._normal_queue, self._output_queue, + self._accumulated_output) diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py deleted file mode 100644 index 6c41ed473b45..000000000000 --- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py +++ /dev/null @@ -1,181 +0,0 @@ - -from __future__ import print_function - - -import re -import select -import threading -import time -import traceback - -from six.moves import queue -from lldbsuite.support import seven - - -def _handle_output_packet_string(packet_contents): - if (not packet_contents) or (len(packet_contents) < 1): - return None - elif packet_contents[0] != "O": - return None - elif packet_contents == "OK": - return None - else: - return seven.unhexlify(packet_contents[1:]) - - -def _dump_queue(the_queue): - while not the_queue.empty(): - print(repr(the_queue.get(True))) - print("\n") - - -class PumpQueues(object): - - def __init__(self): - self._output_queue = queue.Queue() - self._packet_queue = queue.Queue() - - def output_queue(self): - return self._output_queue - - def packet_queue(self): - return self._packet_queue - - def verify_queues_empty(self): - # Warn if there is any content left in any of the queues. - # That would represent unmatched packets. - if not self.output_queue().empty(): - print("warning: output queue entries still exist:") - _dump_queue(self.output_queue()) - print("from here:") - traceback.print_stack() - - if not self.packet_queue().empty(): - print("warning: packet queue entries still exist:") - _dump_queue(self.packet_queue()) - print("from here:") - traceback.print_stack() - - -class SocketPacketPump(object): - """A threaded packet reader that partitions packets into two streams. - - All incoming $O packet content is accumulated with the current accumulation - state put into the OutputQueue. - - All other incoming packets are placed in the packet queue. - - A select thread can be started and stopped, and runs to place packet - content into the two queues. - """ - - _GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}') - - def __init__(self, pump_socket, pump_queues, logger=None): - if not pump_socket: - raise Exception("pump_socket cannot be None") - - self._socket = pump_socket - self._logger = logger - self._receive_buffer = "" - self._accumulated_output = "" - self._pump_queues = pump_queues - - def __enter__(self): - self._receive_buffer = "" - self._accumulated_output = "" - return self - - def __exit__(self, exit_type, value, the_traceback): - pass - - def _read(self, timeout_seconds, q): - now = time.monotonic() - deadline = now + timeout_seconds - while q.empty() and now <= deadline: - can_read, _, _ = select.select([self._socket], [], [], deadline-now) - now = time.monotonic() - if can_read and self._socket in can_read: - try: - new_bytes = seven.bitcast_to_string(self._socket.recv(4096)) - if self._logger and new_bytes and len(new_bytes) > 0: - self._logger.debug( - "pump received bytes: {}".format(new_bytes)) - except: - # Likely a closed socket. Done with the pump thread. - if self._logger: - self._logger.debug( - "socket read failed, stopping pump read thread\n" + - traceback.format_exc(3)) - break - self._process_new_bytes(new_bytes) - if q.empty(): - raise queue.Empty() - return q.get(True) - - def get_output(self, timeout_seconds): - return self._read(timeout_seconds, self._pump_queues.output_queue()) - - def get_packet(self, timeout_seconds): - return self._read(timeout_seconds, self._pump_queues.packet_queue()) - - def _process_new_bytes(self, new_bytes): - if not new_bytes: - return - if len(new_bytes) < 1: - return - - # Add new bytes to our accumulated unprocessed packet bytes. - self._receive_buffer += new_bytes - - # Parse fully-formed packets into individual packets. - has_more = len(self._receive_buffer) > 0 - while has_more: - if len(self._receive_buffer) <= 0: - has_more = False - # handle '+' ack - elif self._receive_buffer[0] == "+": - self._pump_queues.packet_queue().put("+") - self._receive_buffer = self._receive_buffer[1:] - if self._logger: - self._logger.debug( - "parsed packet from stub: +\n" + - "new receive_buffer: {}".format( - self._receive_buffer)) - else: - packet_match = self._GDB_REMOTE_PACKET_REGEX.match( - self._receive_buffer) - if packet_match: - # Our receive buffer matches a packet at the - # start of the receive buffer. - new_output_content = _handle_output_packet_string( - packet_match.group(1)) - if new_output_content: - # This was an $O packet with new content. - self._accumulated_output += new_output_content - self._pump_queues.output_queue().put(self._accumulated_output) - else: - # Any packet other than $O. - self._pump_queues.packet_queue().put(packet_match.group(0)) - - # Remove the parsed packet from the receive - # buffer. - self._receive_buffer = self._receive_buffer[ - len(packet_match.group(0)):] - if self._logger: - self._logger.debug( - "parsed packet from stub: " + - packet_match.group(0)) - self._logger.debug( - "new receive_buffer: " + - self._receive_buffer) - else: - # We don't have enough in the receive bufferto make a full - # packet. Stop trying until we read more. - has_more = False - - def get_accumulated_output(self): - return self._accumulated_output - - def get_receive_buffer(self): - return self._receive_buffer diff --git a/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py b/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py index 94e628c811af..5eec3c44f67a 100644 --- a/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py +++ b/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py @@ -27,6 +27,7 @@ def init_lldb_server(self): self.stub_hostname = "localhost" self.port = int(lldbutil.wait_for_file_on_target(self, port_file)) self.sock = self.create_socket() + self._server = Server(self.sock, server) self.add_no_ack_remote_stream() _______________________________________________ llvm-branch-commits mailing list llvm-branch-commits@lists.llvm.org https://lists.llvm.org/cgi-bin/mailman/listinfo/llvm-branch-commits