ashb commented on code in PR #51699:
URL: https://github.com/apache/airflow/pull/51699#discussion_r2150423289


##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -691,14 +690,58 @@ class TriggerDetails(TypedDict):
     events: int
 
 
[email protected](kw_only=True)
+class TriggerCommsDecoder(CommsDecoder[ToTriggerRunner, ToTriggerSupervisor]):
+    _async_writer: asyncio.StreamWriter = attrs.field(alias="async_writer")
+    _async_reader: asyncio.StreamReader = attrs.field(alias="async_reader")
+
+    body_decoder: TypeAdapter[ToTriggerRunner] = attrs.field(
+        factory=lambda: TypeAdapter(ToTriggerRunner), repr=False
+    )
+
+    _lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False)
+
+    def _read_frame(self):
+        from asgiref.sync import async_to_sync
+
+        return async_to_sync(self._aread_frame)()
+
+    def send(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
+        from asgiref.sync import async_to_sync
+
+        return async_to_sync(self.asend)(msg)
+
+    async def _aread_frame(self):
+        len_bytes = await self._async_reader.readexactly(4)
+        len = int.from_bytes(len_bytes, byteorder="big")
+
+        buffer = await self._async_reader.readexactly(len)
+        return self.resp_decoder.decode(buffer)
+
+    async def _aget_response(self, expect_id: int) -> ToTriggerRunner | None:
+        frame = await self._aread_frame()
+        if frame.id != expect_id:
+            # Given the lock we take out in `asend`, this _shouldn't_ be 
possible, but I'd rather fail with
+            # this explicit error return the wrong type of message back to a 
Trigger
+            raise RuntimeError(f"Response read out of order! Got {frame.id=}, 
{expect_id=}")
+        return self._from_frame(frame)
+
+    async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
+        frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
+        bytes = frame.as_bytes()
+
+        async with self._lock:
+            self._async_writer.write(bytes)

Review Comment:
   I don't we _need_ it. The drain will happen in the background, and we can 
just as well wait on reading as block on the buffer being emptied and then 
waiting on reading. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to