================
@@ -614,56 +645,70 @@ def _send_recv(self, request: Request) -> Response:
self.validate_response(request, response)
return response
- def receive_response(self, seq: int) -> Optional[Response]:
+ def receive_response(self, seq: int) -> Response:
"""Waits for a response with the associated request_sec."""
def predicate(p: ProtocolMessage):
return p["type"] == "response" and p["request_seq"] == seq
- return cast(Optional[Response], self._recv_packet(predicate=predicate))
+ return cast(Response, self._recv_packet(predicate=predicate))
- def wait_for_event(self, filter: List[str] = []) -> Optional[Event]:
+ def wait_for_event(self, filter: List[str]) -> Event:
"""Wait for the first event that matches the filter."""
def predicate(p: ProtocolMessage):
return p["type"] == "event" and p["event"] in filter
- return cast(
- Optional[Event],
- self._recv_packet(predicate=predicate),
- )
+ return cast(Event, self._recv_packet(predicate=predicate))
- def wait_for_stopped(self) -> Optional[List[Event]]:
- stopped_events = []
- stopped_event = self.wait_for_event(filter=["stopped", "exited"])
- while stopped_event:
- stopped_events.append(stopped_event)
- # If we exited, then we are done
- if stopped_event["event"] == "exited":
+ def collect_events(self, filter: List[str]) -> List[Event]:
+ """Wait for the first event that matches the filter."""
+ events = []
+
+ def predicate(p: ProtocolMessage):
+ return p["type"] == "event" and p["event"] in filter
+
+ event = cast(Event, self._recv_packet(predicate=predicate))
+ while event:
+ events.append(event)
+ try:
+ event = cast(
+ Event,
+ self._recv_packet(predicate=predicate,
timeout=EVENT_QUIET_PERIOD),
+ )
+ except TimeoutError:
break
+ return events
+
+ def wait_for_initialized(self):
+ """Wait for the debugger to become initialized."""
+ if self.initialized:
+ return
+ self.wait_for_event(["initialized"])
- # Otherwise we stopped and there might be one or more 'stopped'
- # events for each thread that stopped with a reason, so keep
- # checking for more 'stopped' events and return all of them
- # Use a shorter timeout for additional stopped events
- def predicate(p: ProtocolMessage):
- return p["type"] == "event" and p["event"] in ["stopped",
"exited"]
+ def ensure_initialized(self):
+ """Validates that we can wait for initialized."""
+ if self.initialized:
+ return
+ assert self.launch_or_attach_sent, "launch or attach request not yet
sent"
+ assert (
+ not self.configuration_done_sent
+ ), "configuration done has already been sent, 'initialized' should
have already occurred"
+ self.wait_for_initialized()
- stopped_event = cast(
- Optional[Event], self._recv_packet(predicate=predicate,
timeout=0.25)
- )
- return stopped_events
+ def wait_for_stopped(self) -> List[Event]:
+ """Wait for the next 'stopped' event to occur, coalescing all stopped
events within a given quiet period."""
+ return self.collect_events(["stopped", "exited"])
+
+ def wait_for_module_events(self) -> List[Event]:
+ """Wait for the next 'module' event to occur, coalescing all module
events within a given quiet period."""
+ return self.collect_events(["module"])
def wait_for_breakpoint_events(self):
----------------
DrSergei wrote:
Could you add type hint here
https://github.com/llvm/llvm-project/pull/178041
_______________________________________________
lldb-commits mailing list
[email protected]
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits