kaxil commented on code in PR #48949:
URL: https://github.com/apache/airflow/pull/48949#discussion_r2035251955


##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -128,32 +124,26 @@
     TerminalTIState.SUCCESS,
 ]
 
-
-@overload
-def mkpipe() -> tuple[socket, socket]: ...
-
-
-@overload
-def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ...
+# Setting a fair buffer size here to handle most message sizes. Intention is 
to enforce a buffer size
+# that is big enough to handle small to medium messages while not enforcing 
hard latency issues
+BUFFER_SIZE = 4096
 
 
 def mkpipe(
     remote_read: bool = False,
-) -> tuple[socket, socket | BinaryIO]:
+) -> tuple[socket, socket]:
     """Create a pair of connected sockets."""
     rsock, wsock = socketpair()
     local, remote = (wsock, rsock) if remote_read else (rsock, wsock)
 
-    local.setblocking(False)
-
-    io: BinaryIO | socket
     if remote_read:
-        # If _we_ are writing, we don't want to buffer
-        io = cast("BinaryIO", local.makefile("wb", buffering=0))
-    else:
-        io = local
+        # Setting a 4KB buffer here if possible, if not, it still works, so we 
will suppress all exceptions
+        with suppress(Exception):
+            local.setsockopt(SO_SNDBUF, SOL_SOCKET, BUFFER_SIZE)
+        # set nonblocking to True so that send or sendall waits till all data 
is sent
+        local.setblocking(True)

Review Comment:
   Do we have any latency concerns at scale due to this?



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -128,32 +124,26 @@
     TerminalTIState.SUCCESS,
 ]
 
-
-@overload
-def mkpipe() -> tuple[socket, socket]: ...
-
-
-@overload
-def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ...
+# Setting a fair buffer size here to handle most message sizes. Intention is 
to enforce a buffer size
+# that is big enough to handle small to medium messages while not enforcing 
hard latency issues
+BUFFER_SIZE = 4096

Review Comment:
   Is it worth documenting it somewhere that message above 4kb will still fail?



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -128,32 +124,26 @@
     TerminalTIState.SUCCESS,
 ]
 
-
-@overload
-def mkpipe() -> tuple[socket, socket]: ...
-
-
-@overload
-def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ...
+# Setting a fair buffer size here to handle most message sizes. Intention is 
to enforce a buffer size
+# that is big enough to handle small to medium messages while not enforcing 
hard latency issues
+BUFFER_SIZE = 4096

Review Comment:
   or do we want to make this configurable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to