This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-execute-in-thread in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6c05e7503bafb0cf1401200f9d5100e2f1286099 Author: Maciej Obuchowski <[email protected]> AuthorDate: Mon Jun 15 17:53:06 2026 +0200 OpenLineage: add execute_in_thread to emit task events without forking By default the OpenLineage listener emits each task-level event from a forked child process (os.fork() with no exec). That child inherits the task runner's connection to the Airflow supervisor; if the child's event emission blocks (e.g. a slow or unreachable lineage backend), the inherited connection can be left in a state that prevents the task from being marked complete, leaving it stuck in the `running` state. Add an opt-in `[openlineage] execute_in_thread` option (default False). When enabled, task-level emission runs in a time-bounded daemon thread instead of forking: nothing is inherited, so a blocked emission can never strand the task, and the task runner waits at most `[openlineage] execution_timeout` for emission before proceeding. Metadata extraction still runs in-process with full access to the task runtime, so Operators whose extractors resolve Connections, Variables or XComs keep working. The default (fork) path is unchanged. Signed-off-by: Maciej Obuchowski <[email protected]> --- providers/openlineage/provider.yaml | 19 +++++ .../src/airflow/providers/openlineage/conf.py | 6 ++ .../providers/openlineage/plugins/listener.py | 37 +++++++++- .../unit/openlineage/plugins/test_listener.py | 85 ++++++++++++++++++++++ .../tests/unit/openlineage/test_conf.py | 17 +++++ 5 files changed, 163 insertions(+), 1 deletion(-) diff --git a/providers/openlineage/provider.yaml b/providers/openlineage/provider.yaml index c219b2390f2..51a4c02f027 100644 --- a/providers/openlineage/provider.yaml +++ b/providers/openlineage/provider.yaml @@ -182,6 +182,25 @@ config: example: ~ type: integer version_added: 1.9.0 + execute_in_thread: + description: | + If true, OpenLineage task-level event emission on the worker runs in a time-bounded + background thread instead of a forked child process (the default). + + The default fork model duplicates the task runner process - including its connection to + the Airflow supervisor - into a short-lived child. If the child's event emission blocks + (for example on a slow or unreachable lineage backend), the inherited supervisor + connection can be left in a state that prevents the task from being marked complete, + leaving it stuck in the ``running`` state. Running emission in a thread avoids forking + entirely, so nothing is inherited and the task runner can never be blocked past + ``[openlineage] execution_timeout`` by event emission. + + Metadata extraction still runs in-process with full access to the task runtime, so + Operators whose extractors resolve Connections, Variables or XComs continue to work. + default: "False" + example: ~ + type: boolean + version_added: 2.19.0 extractors: description: | Register custom OpenLineage Extractors by passing a string of semicolon separated full import paths. diff --git a/providers/openlineage/src/airflow/providers/openlineage/conf.py b/providers/openlineage/src/airflow/providers/openlineage/conf.py index 3a2ca201e53..5aa8c47ba05 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/conf.py +++ b/providers/openlineage/src/airflow/providers/openlineage/conf.py @@ -169,6 +169,12 @@ def execution_timeout() -> int: return conf.getint(_CONFIG_SECTION, "execution_timeout", fallback="10") +@cache +def execute_in_thread() -> bool: + """[openlineage] execute_in_thread.""" + return conf.getboolean(_CONFIG_SECTION, "execute_in_thread", fallback="False") + + @cache def include_full_task_info() -> bool: """[openlineage] include_full_task_info.""" diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 2d4d74e828f..7be2ba00d95 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -19,6 +19,7 @@ from __future__ import annotations import logging import os import sys +import threading from concurrent.futures import ProcessPoolExecutor from concurrent.futures.process import BrokenProcessPool from datetime import datetime @@ -819,10 +820,44 @@ class OpenLineageListener: def _execute(self, callable, callable_name: str, use_fork: bool = False): if use_fork: - self._fork_execute(callable, callable_name) + if conf.execute_in_thread(): + self._thread_execute(callable, callable_name) + else: + self._fork_execute(callable, callable_name) else: callable() + def _thread_execute(self, callable, callable_name: str): + """ + Run OpenLineage event emission in a time-bounded daemon thread. + + Opt-in alternative to :meth:`_fork_execute`, enabled via + ``[openlineage] execute_in_thread``. Unlike forking, this never duplicates the + task runner process, so the supervisor connection (and every other inherited + resource) is left untouched -- a blocked emission can therefore never leave the + task stuck in the ``running`` state. Metadata extraction still runs in-process + with full access to the task runtime, so Operators whose extractors resolve + Connections, Variables or XComs keep working. + """ + thread = threading.Thread( + target=callable, + name=f"openlineage-{callable_name}", + daemon=True, + ) + thread.start() + thread.join(timeout=conf.execution_timeout()) + if thread.is_alive(): + # Emission is still running. We deliberately do not keep waiting: the thread is + # a daemon holding only its own backend connection (never the supervisor socket), + # so abandoning it cannot block the task runner or the worker -- it is reaped when + # the process exits. This mirrors the fork path terminating an over-running child. + self.log.warning( + "OpenLineage %s thread did not finish within execution_timeout=%ss and will be " + "abandoned. This has no impact on actual task execution status.", + callable_name, + conf.execution_timeout(), + ) + def _terminate_with_wait(self, process: psutil.Process): process.terminate() try: diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py index edec29202c0..716db433427 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py @@ -2319,3 +2319,88 @@ class TestOpenLineageSelectiveEnableAirflow2: assert expected_call_count == listener._executor.submit.call_count assert expected_task_call_count == listener.extractor_manager.extract_metadata.call_count + + +class TestExecuteRouting: + """Tests for `_execute` fork/thread routing and the `_thread_execute` bound (#65714 follow-up).""" + + @conf_vars({("openlineage", "execute_in_thread"): "False"}) + @patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._thread_execute") + @patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._fork_execute") + def test_execute_uses_fork_by_default(self, mock_fork, mock_thread): + listener = OpenLineageListener() + callable_ = MagicMock() + + listener._execute(callable_, "on_running", use_fork=True) + + mock_fork.assert_called_once_with(callable_, "on_running") + mock_thread.assert_not_called() + callable_.assert_not_called() + + @conf_vars({("openlineage", "execute_in_thread"): "True"}) + @patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._thread_execute") + @patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._fork_execute") + def test_execute_uses_thread_when_enabled(self, mock_fork, mock_thread): + listener = OpenLineageListener() + callable_ = MagicMock() + + listener._execute(callable_, "on_running", use_fork=True) + + mock_thread.assert_called_once_with(callable_, "on_running") + mock_fork.assert_not_called() + callable_.assert_not_called() + + @conf_vars({("openlineage", "execute_in_thread"): "True"}) + @patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._thread_execute") + @patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._fork_execute") + def test_execute_runs_inline_without_fork_flag(self, mock_fork, mock_thread): + # use_fork=False always runs the callable directly, regardless of execute_in_thread. + listener = OpenLineageListener() + callable_ = MagicMock() + + listener._execute(callable_, "on_state_change", use_fork=False) + + callable_.assert_called_once_with() + mock_fork.assert_not_called() + mock_thread.assert_not_called() + + @conf_vars({("openlineage", "execute_in_thread"): "True", ("openlineage", "execution_timeout"): "5"}) + def test_thread_execute_runs_callable_to_completion(self): + listener = OpenLineageListener() + result = {"ran": False} + + def _emit(): + result["ran"] = True + + listener._thread_execute(_emit, "on_running") + + assert result["ran"] is True + + @conf_vars({("openlineage", "execute_in_thread"): "True", ("openlineage", "execution_timeout"): "1"}) + def test_thread_execute_is_bounded_and_abandons_overrunning_emission(self): + import threading + import time + + listener = OpenLineageListener() + started = threading.Event() + release = threading.Event() + finished = {"v": False} + + def _slow_emit(): + started.set() + # Block far longer than execution_timeout; released by the test for cleanup. + release.wait(timeout=30) + finished["v"] = True + + try: + t0 = time.monotonic() + listener._thread_execute(_slow_emit, "on_running") + elapsed = time.monotonic() - t0 + + # The emission started, but `_thread_execute` returned without waiting for it: + # bounded by execution_timeout (1s), never blocking for the full 30s emission. + assert started.is_set() + assert finished["v"] is False + assert elapsed < 10 + finally: + release.set() diff --git a/providers/openlineage/tests/unit/openlineage/test_conf.py b/providers/openlineage/tests/unit/openlineage/test_conf.py index 5ee497a6151..61e32d46d1b 100644 --- a/providers/openlineage/tests/unit/openlineage/test_conf.py +++ b/providers/openlineage/tests/unit/openlineage/test_conf.py @@ -30,6 +30,7 @@ from airflow.providers.openlineage.conf import ( dag_state_change_process_pool_size, debug_mode, disabled_operators, + execute_in_thread, execution_timeout, include_full_task_info, is_disabled, @@ -63,6 +64,7 @@ _CONFIG_OPTION_SELECTIVE_ENABLE = "selective_enable" _CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE = "dag_state_change_process_pool_size" _CONFIG_OPTION_INCLUDE_FULL_TASK_INFO = "include_full_task_info" _CONFIG_OPTION_DEBUG_MODE = "debug_mode" +_CONFIG_OPTION_EXECUTE_IN_THREAD = "execute_in_thread" _CONFIG_OPTION_SPARK_INJECT_PARENT_JOB_INFO = "spark_inject_parent_job_info" _CONFIG_OPTION_SPARK_INJECT_TRANSPORT_INFO = "spark_inject_transport_info" @@ -629,6 +631,21 @@ def test_debug_mode(var_string, expected): assert result is expected [email protected]( + ("var_string", "expected"), + _BOOL_PARAMS, +) +def test_execute_in_thread(var_string, expected): + with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_EXECUTE_IN_THREAD): var_string}): + result = execute_in_thread() + assert result is expected + + +@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_EXECUTE_IN_THREAD): None}) +def test_execute_in_thread_do_not_fail_if_conf_option_missing(): + assert execute_in_thread() is False + + @pytest.mark.parametrize( "var_string", (
