kacpermuda commented on code in PR #68708:
URL: https://github.com/apache/airflow/pull/68708#discussion_r3435069697
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -819,10 +820,44 @@ def _on_task_instance_manual_state_change(
def _execute(self, callable, callable_name: str, use_fork: bool = False):
if use_fork:
- self._fork_execute(callable, callable_name)
+ if conf.execute_in_thread():
+ self._thread_execute(callable, callable_name)
+ else:
+ self._fork_execute(callable, callable_name)
else:
callable()
+ def _thread_execute(self, callable, callable_name: str):
+ """
+ Run OpenLineage event emission in a time-bounded daemon thread.
+
+ Opt-in alternative to :meth:`_fork_execute`, enabled via
+ ``[openlineage] execute_in_thread``. Unlike forking, this never
duplicates the
+ task runner process, so the supervisor connection (and every other
inherited
+ resource) is left untouched -- a blocked emission can therefore never
leave the
+ task stuck in the ``running`` state. Metadata extraction still runs
in-process
+ with full access to the task runtime, so Operators whose extractors
resolve
+ Connections, Variables or XComs keep working.
+ """
+ thread = threading.Thread(
+ target=callable,
+ name=f"openlineage-{callable_name}",
+ daemon=True,
+ )
+ thread.start()
+ thread.join(timeout=conf.execution_timeout())
+ if thread.is_alive():
+ # Emission is still running. We deliberately do not keep waiting:
the thread is
Review Comment:
**COR.N2** [nit] Correctness — Abandoned thread reads live shared process
state instead of a fork-time snapshot
`_fork_execute` runs the callable against a copy-on-write snapshot of the
process at fork time, so post-fork mutations in the parent cannot affect the
in-flight event. The thread path shares the same process memory: after
`_thread_execute` returns on timeout and the task runner proceeds
(finalizing/closing the supervisor connection or ORM session), the
still-running emission thread continues reading the same live objects/sessions.
This can surface as partially-built or corrupted events, or exceptions when a
session/socket the thread is using is torn down underneath it (caught by
`@print_warning`, so no crash, but emission silently lost). Risk is low because
emission closures mostly read immutable locals, but the comment "holding only
its own backend connection" here understates this fork-vs-thread semantic
difference.
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -819,10 +820,44 @@ def _on_task_instance_manual_state_change(
def _execute(self, callable, callable_name: str, use_fork: bool = False):
if use_fork:
- self._fork_execute(callable, callable_name)
+ if conf.execute_in_thread():
+ self._thread_execute(callable, callable_name)
+ else:
+ self._fork_execute(callable, callable_name)
else:
callable()
+ def _thread_execute(self, callable, callable_name: str):
+ """
+ Run OpenLineage event emission in a time-bounded daemon thread.
+
+ Opt-in alternative to :meth:`_fork_execute`, enabled via
+ ``[openlineage] execute_in_thread``. Unlike forking, this never
duplicates the
+ task runner process, so the supervisor connection (and every other
inherited
+ resource) is left untouched -- a blocked emission can therefore never
leave the
+ task stuck in the ``running`` state. Metadata extraction still runs
in-process
+ with full access to the task runtime, so Operators whose extractors
resolve
+ Connections, Variables or XComs keep working.
+ """
+ thread = threading.Thread(
+ target=callable,
+ name=f"openlineage-{callable_name}",
+ daemon=True,
+ )
+ thread.start()
+ thread.join(timeout=conf.execution_timeout())
Review Comment:
**COR.N1** [nit] Correctness — Abandoned emission thread can still block the
task runner via the shared `SUPERVISOR_COMMS` lock
The thread path's stated guarantee is that emission can never block the task
runner past `execution_timeout`. However, the emission callable runs
`extract_metadata`, which on Airflow 3 resolves Connections/Variables/XComs
through the single shared `SUPERVISOR_COMMS` channel whose `_thread_lock` spans
the entire request/response round trip (`task_sdk/.../comms.py send()`). If the
abandoned daemon thread is mid-`send()` when the main task thread next needs
the supervisor channel, the main thread blocks on that lock until the abandoned
thread's round-trip completes — reintroducing, in thread form, a variant of the
supervisor-channel stall the fork path was meant to avoid. Unlike fork (which
is terminated via `_terminate_with_wait`), the thread is neither isolated from
nor killable on this shared resource. Low likelihood in the common slow-backend
case (the hang is usually in adapter HTTP emit, not under the supervisor lock),
so a caveat rather than a break, but it qualifies the doc
umented "can never block" claim.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]