kacpermuda commented on code in PR #68708:
URL: https://github.com/apache/airflow/pull/68708#discussion_r3435069693


##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -819,10 +820,44 @@ def _on_task_instance_manual_state_change(
 
     def _execute(self, callable, callable_name: str, use_fork: bool = False):
         if use_fork:
-            self._fork_execute(callable, callable_name)
+            if conf.execute_in_thread():
+                self._thread_execute(callable, callable_name)
+            else:
+                self._fork_execute(callable, callable_name)
         else:
             callable()
 
+    def _thread_execute(self, callable, callable_name: str):
+        """
+        Run OpenLineage event emission in a time-bounded daemon thread.
+
+        Opt-in alternative to :meth:`_fork_execute`, enabled via
+        ``[openlineage] execute_in_thread``. Unlike forking, this never 
duplicates the
+        task runner process, so the supervisor connection (and every other 
inherited
+        resource) is left untouched -- a blocked emission can therefore never 
leave the
+        task stuck in the ``running`` state. Metadata extraction still runs 
in-process
+        with full access to the task runtime, so Operators whose extractors 
resolve
+        Connections, Variables or XComs keep working.
+        """
+        thread = threading.Thread(
+            target=callable,
+            name=f"openlineage-{callable_name}",
+            daemon=True,
+        )
+        thread.start()
+        thread.join(timeout=conf.execution_timeout())

Review Comment:
    [nit] Correctness — Abandoned emission thread can still block the task 
runner via the shared `SUPERVISOR_COMMS` lock
   
   The thread path's stated guarantee is that emission can never block the task 
runner past `execution_timeout`. However, the emission callable runs 
`extract_metadata`, which on Airflow 3 resolves Connections/Variables/XComs 
through the single shared `SUPERVISOR_COMMS` channel whose `_thread_lock` spans 
the entire request/response round trip (`task_sdk/.../comms.py send()`). If the 
abandoned daemon thread is mid-`send()` when the main task thread next needs 
the supervisor channel, the main thread blocks on that lock until the abandoned 
thread's round-trip completes — reintroducing, in thread form, a variant of the 
supervisor-channel stall the fork path was meant to avoid. Unlike fork (which 
is terminated via `_terminate_with_wait`), the thread is neither isolated from 
nor killable on this shared resource. Low likelihood in the common slow-backend 
case (the hang is usually in adapter HTTP emit, not under the supervisor lock), 
so a caveat rather than a break, but it qualifies the doc
 umented "can never block" claim.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to