ashb commented on code in PR #51699:
URL: https://github.com/apache/airflow/pull/51699#discussion_r2150870075
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -691,14 +690,58 @@ class TriggerDetails(TypedDict):
events: int
[email protected](kw_only=True)
+class TriggerCommsDecoder(CommsDecoder[ToTriggerRunner, ToTriggerSupervisor]):
+ _async_writer: asyncio.StreamWriter = attrs.field(alias="async_writer")
+ _async_reader: asyncio.StreamReader = attrs.field(alias="async_reader")
+
+ body_decoder: TypeAdapter[ToTriggerRunner] = attrs.field(
+ factory=lambda: TypeAdapter(ToTriggerRunner), repr=False
+ )
+
+ _lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False)
+
+ def _read_frame(self):
+ from asgiref.sync import async_to_sync
+
+ return async_to_sync(self._aread_frame)()
+
+ def send(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
+ from asgiref.sync import async_to_sync
+
+ return async_to_sync(self.asend)(msg)
+
+ async def _aread_frame(self):
+ len_bytes = await self._async_reader.readexactly(4)
+ len = int.from_bytes(len_bytes, byteorder="big")
+
+ buffer = await self._async_reader.readexactly(len)
+ return self.resp_decoder.decode(buffer)
Review Comment:
It's async, so it'll "hang" in the sense that it will block the current task
until the response is availabel, but it's async so other triggers will continue
to run.
I still don't think I've understood your question though
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]