ashb commented on code in PR #48949:
URL: https://github.com/apache/airflow/pull/48949#discussion_r2035334063
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -128,32 +124,26 @@
TerminalTIState.SUCCESS,
]
-
-@overload
-def mkpipe() -> tuple[socket, socket]: ...
-
-
-@overload
-def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ...
+# Setting a fair buffer size here to handle most message sizes. Intention is
to enforce a buffer size
+# that is big enough to handle small to medium messages while not enforcing
hard latency issues
+BUFFER_SIZE = 4096
def mkpipe(
remote_read: bool = False,
-) -> tuple[socket, socket | BinaryIO]:
+) -> tuple[socket, socket]:
"""Create a pair of connected sockets."""
rsock, wsock = socketpair()
local, remote = (wsock, rsock) if remote_read else (rsock, wsock)
- local.setblocking(False)
-
- io: BinaryIO | socket
if remote_read:
- # If _we_ are writing, we don't want to buffer
- io = cast("BinaryIO", local.makefile("wb", buffering=0))
- else:
- io = local
+ # Setting a 4KB buffer here if possible, if not, it still works, so we
will suppress all exceptions
+ with suppress(Exception):
+ local.setsockopt(SO_SNDBUF, SOL_SOCKET, BUFFER_SIZE)
+ # set nonblocking to True so that send or sendall waits till all data
is sent
+ local.setblocking(True)
Review Comment:
Nope, it should be fine as it's in the supervisor, and we can't proceed
before we've written the entire message anyway. (And client can't do anything
else on the socket until it's read the response)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]