jason810496 commented on code in PR #65958:
URL: https://github.com/apache/airflow/pull/65958#discussion_r3271856268
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -730,35 +733,51 @@ def _create_log_forwarder(self, loggers, name,
log_level=logging.INFO) -> Callab
def _on_socket_closed(self, sock: socket):
# We want to keep servicing this process until we've read up to EOF
from all the sockets.
-
with suppress(KeyError):
self.selector.unregister(sock)
del self._open_sockets[sock]
+ def _serialize_response(self, msg: BaseModel | ErrorResponse, **dump_opts)
-> dict[str, Any]:
+ if self._subprocess_schema_version is not None:
+ migrator = get_schema_version_migrator()
+ msg = migrator.downgrade(msg, self._subprocess_schema_version,
dump_kwargs=dump_opts)
+ return msg.model_dump(**dump_opts)
+
def send_msg(
- self, msg: BaseModel | None, request_id: int, error: ErrorResponse |
None = None, **dump_opts
+ self,
+ msg: BaseModel | None,
+ request_id: int,
+ error: ErrorResponse | None = None,
+ **dump_opts,
):
"""
Send the msg as a length-prefixed response frame.
- ``request_id`` is the ID that the client sent in it's request, and has
no meaning to the server
-
+ :param request_id: The ID sent in the request by the client. This has
no
+ meaning to the server, and is only included in the response frame
+ for the client to identify what the response is for.
"""
if msg:
- frame = _ResponseFrame(id=request_id,
body=msg.model_dump(**dump_opts))
+ frame = _ResponseFrame(id=request_id,
body=self._serialize_response(msg, **dump_opts))
else:
- err_resp = error.model_dump() if error else None
+ err_resp = self._serialize_response(error) if error else None
frame = _ResponseFrame(id=request_id, error=err_resp)
-
self.stdin.sendall(frame.as_bytes())
+ def _deserialize_request(self, body: dict[str, Any] | None) -> dict[str,
Any] | None:
Review Comment:
For `_deserialize_request`, `_serialize_response` naming. Would it be better
to name as something with migrate or compatible? Since, those methods are
really migrating the schema.
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -548,6 +549,8 @@ class WatchedSubprocess:
_process: psutil.Process = attrs.field(repr=False)
"""File descriptor for request handling."""
+ _subprocess_schema_version: str | None = None
Review Comment:
Nice naming, it's concise and clear.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]