kacpermuda commented on code in PR #68708:
URL: https://github.com/apache/airflow/pull/68708#discussion_r3435069697
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -819,10 +820,44 @@ def _on_task_instance_manual_state_change(
def _execute(self, callable, callable_name: str, use_fork: bool = False):
if use_fork:
- self._fork_execute(callable, callable_name)
+ if conf.execute_in_thread():
+ self._thread_execute(callable, callable_name)
+ else:
+ self._fork_execute(callable, callable_name)
else:
callable()
+ def _thread_execute(self, callable, callable_name: str):
+ """
+ Run OpenLineage event emission in a time-bounded daemon thread.
+
+ Opt-in alternative to :meth:`_fork_execute`, enabled via
+ ``[openlineage] execute_in_thread``. Unlike forking, this never
duplicates the
+ task runner process, so the supervisor connection (and every other
inherited
+ resource) is left untouched -- a blocked emission can therefore never
leave the
+ task stuck in the ``running`` state. Metadata extraction still runs
in-process
+ with full access to the task runtime, so Operators whose extractors
resolve
+ Connections, Variables or XComs keep working.
+ """
+ thread = threading.Thread(
+ target=callable,
+ name=f"openlineage-{callable_name}",
+ daemon=True,
+ )
+ thread.start()
+ thread.join(timeout=conf.execution_timeout())
+ if thread.is_alive():
+ # Emission is still running. We deliberately do not keep waiting:
the thread is
Review Comment:
[nit] Correctness — Abandoned thread reads live shared process state instead
of a fork-time snapshot
`_fork_execute` runs the callable against a copy-on-write snapshot of the
process at fork time, so post-fork mutations in the parent cannot affect the
in-flight event. The thread path shares the same process memory: after
`_thread_execute` returns on timeout and the task runner proceeds
(finalizing/closing the supervisor connection or ORM session), the
still-running emission thread continues reading the same live objects/sessions.
This can surface as partially-built or corrupted events, or exceptions when a
session/socket the thread is using is torn down underneath it (caught by
`@print_warning`, so no crash, but emission silently lost). Risk is low because
emission closures mostly read immutable locals, but the comment "holding only
its own backend connection" here understates this fork-vs-thread semantic
difference.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]