gopidesupavan commented on code in PR #51699:
URL: https://github.com/apache/airflow/pull/51699#discussion_r2159634181
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -552,37 +542,52 @@ def _register_pipe_readers(self, stdout: socket, stderr:
socket, requests: socke
self.selector.register(
requests,
selectors.EVENT_READ,
- make_buffered_socket_reader(self.handle_requests(log),
on_close=self._on_socket_closed),
+ length_prefixed_frame_reader(self.handle_requests(log),
on_close=self._on_socket_closed),
)
- def _create_socket_handler(self, loggers, channel, log_level=logging.INFO)
-> Callable[[socket], bool]:
+ def _create_log_forwarder(self, loggers, channel, log_level=logging.INFO)
-> Callable[[socket], bool]:
"""Create a socket handler that forwards logs to a logger."""
return make_buffered_socket_reader(
forward_to_log(loggers, chan=channel, level=log_level),
on_close=self._on_socket_closed
)
- def _on_socket_closed(self):
+ def _on_socket_closed(self, sock: socket):
# We want to keep servicing this process until we've read up to EOF
from all the sockets.
- self._num_open_sockets -= 1
- def send_msg(self, msg: BaseModel, **dump_opts):
- """Send the given pydantic message to the subprocess at once by
encoding it and adding a line break."""
- b = msg.model_dump_json(**dump_opts).encode() + b"\n"
- self.stdin.sendall(b)
+ with suppress(KeyError):
+ self.selector.unregister(sock)
+ del self._open_sockets[sock]
+
+ def send_msg(
+ self, msg: BaseModel | None, request_id: int, error: ErrorResponse |
None = None, **dump_opts
+ ):
+ """
+ Send the msg as a length-prefixed response frame.
+
+ ``request_id`` is the ID that the client sent in it's request, and has
no meaning to the server
+
+ """
+ if msg:
+ frame = _ResponseFrame(id=request_id,
body=msg.model_dump(**dump_opts))
+ else:
+ err_resp = error.model_dump() if error else None
+ frame = _ResponseFrame(id=request_id, error=err_resp)
+
+ self.stdin.sendall(frame.as_bytes())
- def handle_requests(self, log: FilteringBoundLogger) -> Generator[None,
bytes, None]:
+ def handle_requests(self, log: FilteringBoundLogger) -> Generator[None,
_RequestFrame, None]:
"""Handle incoming requests from the task process, respond with the
appropriate data."""
while True:
- line = yield
+ request = yield
try:
- msg = self.decoder.validate_json(line)
+ msg = self.decoder.validate_python(request.body)
except Exception:
- log.exception("Unable to decode message", line=line)
+ log.exception("Unable to decode message", body=request.body)
continue
Review Comment:
Why we are continuing? i think if we unable to decode we should respond ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]