kaxil commented on code in PR #48949:
URL: https://github.com/apache/airflow/pull/48949#discussion_r2035251955
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -128,32 +124,26 @@
TerminalTIState.SUCCESS,
]
-
-@overload
-def mkpipe() -> tuple[socket, socket]: ...
-
-
-@overload
-def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ...
+# Setting a fair buffer size here to handle most message sizes. Intention is
to enforce a buffer size
+# that is big enough to handle small to medium messages while not enforcing
hard latency issues
+BUFFER_SIZE = 4096
def mkpipe(
remote_read: bool = False,
-) -> tuple[socket, socket | BinaryIO]:
+) -> tuple[socket, socket]:
"""Create a pair of connected sockets."""
rsock, wsock = socketpair()
local, remote = (wsock, rsock) if remote_read else (rsock, wsock)
- local.setblocking(False)
-
- io: BinaryIO | socket
if remote_read:
- # If _we_ are writing, we don't want to buffer
- io = cast("BinaryIO", local.makefile("wb", buffering=0))
- else:
- io = local
+ # Setting a 4KB buffer here if possible, if not, it still works, so we
will suppress all exceptions
+ with suppress(Exception):
+ local.setsockopt(SO_SNDBUF, SOL_SOCKET, BUFFER_SIZE)
+ # set nonblocking to True so that send or sendall waits till all data
is sent
+ local.setblocking(True)
Review Comment:
Do we have any latency concerns at scale due to this?
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -128,32 +124,26 @@
TerminalTIState.SUCCESS,
]
-
-@overload
-def mkpipe() -> tuple[socket, socket]: ...
-
-
-@overload
-def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ...
+# Setting a fair buffer size here to handle most message sizes. Intention is
to enforce a buffer size
+# that is big enough to handle small to medium messages while not enforcing
hard latency issues
+BUFFER_SIZE = 4096
Review Comment:
Is it worth documenting it somewhere that message above 4kb will still fail?
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -128,32 +124,26 @@
TerminalTIState.SUCCESS,
]
-
-@overload
-def mkpipe() -> tuple[socket, socket]: ...
-
-
-@overload
-def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ...
+# Setting a fair buffer size here to handle most message sizes. Intention is
to enforce a buffer size
+# that is big enough to handle small to medium messages while not enforcing
hard latency issues
+BUFFER_SIZE = 4096
Review Comment:
or do we want to make this configurable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]