ashb commented on code in PR #64874:
URL: https://github.com/apache/airflow/pull/64874#discussion_r3049560492


##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -441,6 +449,56 @@ def exit(n: int) -> NoReturn:
             exit(125)
 
 
+_USE_FORK_EXEC = sys.platform == "darwin"
+"""On macOS, use fork + exec (``os.fork`` then ``os.execv``) instead of bare 
``os.fork``.
+
+macOS system libraries (Security.framework, CoreFoundation, ``_scproxy``) use
+Objective-C, which is not fork-safe.  A bare ``os.fork()`` copies the parent's
+ObjC runtime state; if the child then triggers ObjC class initialization
+(e.g. via ``socket.getaddrinfo`` -> system DNS resolver -> proxy lookup), the
+runtime detects the corrupted state and crashes with SIGABRT.
+
+Calling ``os.execv`` immediately after ``os.fork`` replaces the child's address
+space, giving it clean ObjC state.  The socketpair FDs survive across exec
+because they are marked inheritable; the child reads FD numbers from an env var
+and reconstructs the communication channels.
+
+This applies to all executors (Local, Celery, etc.) but only on macOS and only
+for task execution (``target is _subprocess_main``).  DAG processor and 
triggerer
+use different targets and keep bare fork -- they don't make network calls that
+trigger the macOS crash.
+
+See: https://github.com/python/cpython/issues/105912
+     https://github.com/apache/airflow/discussions/24463
+"""
+
+# Env var key used to pass socket FDs to the child when using fork+exec.
+_CHILD_FDS_ENV_VAR = "_AIRFLOW_SUPERVISOR_CHILD_FDS"
+
+
+def _child_exec_main():
+    """
+    Entry point for the child process when using fork+exec (macOS).
+
+    Reads socket FD numbers from the environment, reconstructs the sockets,
+    and hands off to ``_fork_main`` -- the same code path as the bare-fork
+    child.
+    """
+    import json
+    import socket as _socket
+
+    fds = json.loads(os.environ.pop(_CHILD_FDS_ENV_VAR))
+    child_requests = _socket.socket(fileno=fds["requests"])
+    child_stdout = _socket.socket(fileno=fds["stdout"])
+    child_stderr = _socket.socket(fileno=fds["stderr"])
+    log_fd = fds["logs"]

Review Comment:
   We've already got a mechanism to pass the fd and re-open the logs socket. We 
should use that rather than implement a new way. Out at least we only need to 
pass the log socket fd, as all the others are guaranteed to be 0,1,2
   
   Also stdout, in and error are inheritable by default and kept when doing 
exec so we're shouldn't need to handle those differently at all



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,49 @@ def start(
             del constructor_kwargs
             del logger
 
-            try:
-                # Run the child entrypoint
-                _fork_main(child_requests, child_stdout, child_stderr, 
child_logs.fileno(), target)
-            except BaseException as e:
-                import traceback
-
-                with suppress(BaseException):
-                    # We can't use log here, as if we except out of _fork_main 
something _weird_ went on.
-                    print("Exception in _fork_main, exiting with code 124", 
file=sys.stderr)
-                    traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)
+            if _USE_FORK_EXEC and target is _subprocess_main:
+                # macOS: immediately exec a fresh Python interpreter to 
replace the
+                # inherited ObjC/CoreFoundation state that is not fork-safe.  
Only
+                # for task execution (_subprocess_main); DAG processor and 
triggerer
+                # use different targets and keep bare fork.  The socketpair FDs
+                # survive across exec because we mark them inheritable.
+                try:
+                    import json
+
+                    fds = {
+                        "requests": child_requests.fileno(),
+                        "stdout": child_stdout.fileno(),
+                        "stderr": child_stderr.fileno(),
+                        "logs": child_logs.fileno(),
+                    }
+                    for fd in fds.values():
+                        os.set_inheritable(fd, True)
+
+                    os.environ[_CHILD_FDS_ENV_VAR] = json.dumps(fds)
+                    os.execv(sys.executable, [
+                        sys.executable,
+                        "-c",
+                        "from airflow.sdk.execution_time.supervisor import 
_child_exec_main;"
+                        " _child_exec_main()",
+                    ])
+                    # execv replaces the process -- we never reach here
+                except BaseException as e:
+                    import traceback
+
+                    with suppress(BaseException):
+                        print(f"execv failed, exiting with code 124: {e}", 
file=sys.stderr)
+                        traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)
+            else:

Review Comment:
   Style/diff nit:
   
   No need for the else here as the if block never exits, so the else can 
remove and the contents un-intended 



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,49 @@ def start(
             del constructor_kwargs
             del logger
 
-            try:
-                # Run the child entrypoint
-                _fork_main(child_requests, child_stdout, child_stderr, 
child_logs.fileno(), target)
-            except BaseException as e:
-                import traceback
-
-                with suppress(BaseException):
-                    # We can't use log here, as if we except out of _fork_main 
something _weird_ went on.
-                    print("Exception in _fork_main, exiting with code 124", 
file=sys.stderr)
-                    traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)
+            if _USE_FORK_EXEC and target is _subprocess_main:

Review Comment:
   I'm not sure how i feel about the `target is _subprocess_main` part...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to