This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch openlineage-execute-in-thread
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6c05e7503bafb0cf1401200f9d5100e2f1286099
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Mon Jun 15 17:53:06 2026 +0200

    OpenLineage: add execute_in_thread to emit task events without forking
    
    By default the OpenLineage listener emits each task-level event from a
    forked child process (os.fork() with no exec). That child inherits the
    task runner's connection to the Airflow supervisor; if the child's event
    emission blocks (e.g. a slow or unreachable lineage backend), the
    inherited connection can be left in a state that prevents the task from
    being marked complete, leaving it stuck in the `running` state.
    
    Add an opt-in `[openlineage] execute_in_thread` option (default False).
    When enabled, task-level emission runs in a time-bounded daemon thread
    instead of forking: nothing is inherited, so a blocked emission can never
    strand the task, and the task runner waits at most
    `[openlineage] execution_timeout` for emission before proceeding.
    Metadata extraction still runs in-process with full access to the task
    runtime, so Operators whose extractors resolve Connections, Variables or
    XComs keep working.
    
    The default (fork) path is unchanged.
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 providers/openlineage/provider.yaml                | 19 +++++
 .../src/airflow/providers/openlineage/conf.py      |  6 ++
 .../providers/openlineage/plugins/listener.py      | 37 +++++++++-
 .../unit/openlineage/plugins/test_listener.py      | 85 ++++++++++++++++++++++
 .../tests/unit/openlineage/test_conf.py            | 17 +++++
 5 files changed, 163 insertions(+), 1 deletion(-)

diff --git a/providers/openlineage/provider.yaml 
b/providers/openlineage/provider.yaml
index c219b2390f2..51a4c02f027 100644
--- a/providers/openlineage/provider.yaml
+++ b/providers/openlineage/provider.yaml
@@ -182,6 +182,25 @@ config:
         example: ~
         type: integer
         version_added: 1.9.0
+      execute_in_thread:
+        description: |
+          If true, OpenLineage task-level event emission on the worker runs in 
a time-bounded
+          background thread instead of a forked child process (the default).
+
+          The default fork model duplicates the task runner process - 
including its connection to
+          the Airflow supervisor - into a short-lived child. If the child's 
event emission blocks
+          (for example on a slow or unreachable lineage backend), the 
inherited supervisor
+          connection can be left in a state that prevents the task from being 
marked complete,
+          leaving it stuck in the ``running`` state. Running emission in a 
thread avoids forking
+          entirely, so nothing is inherited and the task runner can never be 
blocked past
+          ``[openlineage] execution_timeout`` by event emission.
+
+          Metadata extraction still runs in-process with full access to the 
task runtime, so
+          Operators whose extractors resolve Connections, Variables or XComs 
continue to work.
+        default: "False"
+        example: ~
+        type: boolean
+        version_added: 2.19.0
       extractors:
         description: |
           Register custom OpenLineage Extractors by passing a string of 
semicolon separated full import paths.
diff --git a/providers/openlineage/src/airflow/providers/openlineage/conf.py 
b/providers/openlineage/src/airflow/providers/openlineage/conf.py
index 3a2ca201e53..5aa8c47ba05 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/conf.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/conf.py
@@ -169,6 +169,12 @@ def execution_timeout() -> int:
     return conf.getint(_CONFIG_SECTION, "execution_timeout", fallback="10")
 
 
+@cache
+def execute_in_thread() -> bool:
+    """[openlineage] execute_in_thread."""
+    return conf.getboolean(_CONFIG_SECTION, "execute_in_thread", 
fallback="False")
+
+
 @cache
 def include_full_task_info() -> bool:
     """[openlineage] include_full_task_info."""
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index 2d4d74e828f..7be2ba00d95 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 import logging
 import os
 import sys
+import threading
 from concurrent.futures import ProcessPoolExecutor
 from concurrent.futures.process import BrokenProcessPool
 from datetime import datetime
@@ -819,10 +820,44 @@ class OpenLineageListener:
 
     def _execute(self, callable, callable_name: str, use_fork: bool = False):
         if use_fork:
-            self._fork_execute(callable, callable_name)
+            if conf.execute_in_thread():
+                self._thread_execute(callable, callable_name)
+            else:
+                self._fork_execute(callable, callable_name)
         else:
             callable()
 
+    def _thread_execute(self, callable, callable_name: str):
+        """
+        Run OpenLineage event emission in a time-bounded daemon thread.
+
+        Opt-in alternative to :meth:`_fork_execute`, enabled via
+        ``[openlineage] execute_in_thread``. Unlike forking, this never 
duplicates the
+        task runner process, so the supervisor connection (and every other 
inherited
+        resource) is left untouched -- a blocked emission can therefore never 
leave the
+        task stuck in the ``running`` state. Metadata extraction still runs 
in-process
+        with full access to the task runtime, so Operators whose extractors 
resolve
+        Connections, Variables or XComs keep working.
+        """
+        thread = threading.Thread(
+            target=callable,
+            name=f"openlineage-{callable_name}",
+            daemon=True,
+        )
+        thread.start()
+        thread.join(timeout=conf.execution_timeout())
+        if thread.is_alive():
+            # Emission is still running. We deliberately do not keep waiting: 
the thread is
+            # a daemon holding only its own backend connection (never the 
supervisor socket),
+            # so abandoning it cannot block the task runner or the worker -- 
it is reaped when
+            # the process exits. This mirrors the fork path terminating an 
over-running child.
+            self.log.warning(
+                "OpenLineage %s thread did not finish within 
execution_timeout=%ss and will be "
+                "abandoned. This has no impact on actual task execution 
status.",
+                callable_name,
+                conf.execution_timeout(),
+            )
+
     def _terminate_with_wait(self, process: psutil.Process):
         process.terminate()
         try:
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
index edec29202c0..716db433427 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
@@ -2319,3 +2319,88 @@ class TestOpenLineageSelectiveEnableAirflow2:
 
         assert expected_call_count == listener._executor.submit.call_count
         assert expected_task_call_count == 
listener.extractor_manager.extract_metadata.call_count
+
+
+class TestExecuteRouting:
+    """Tests for `_execute` fork/thread routing and the `_thread_execute` 
bound (#65714 follow-up)."""
+
+    @conf_vars({("openlineage", "execute_in_thread"): "False"})
+    
@patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._thread_execute")
+    
@patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._fork_execute")
+    def test_execute_uses_fork_by_default(self, mock_fork, mock_thread):
+        listener = OpenLineageListener()
+        callable_ = MagicMock()
+
+        listener._execute(callable_, "on_running", use_fork=True)
+
+        mock_fork.assert_called_once_with(callable_, "on_running")
+        mock_thread.assert_not_called()
+        callable_.assert_not_called()
+
+    @conf_vars({("openlineage", "execute_in_thread"): "True"})
+    
@patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._thread_execute")
+    
@patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._fork_execute")
+    def test_execute_uses_thread_when_enabled(self, mock_fork, mock_thread):
+        listener = OpenLineageListener()
+        callable_ = MagicMock()
+
+        listener._execute(callable_, "on_running", use_fork=True)
+
+        mock_thread.assert_called_once_with(callable_, "on_running")
+        mock_fork.assert_not_called()
+        callable_.assert_not_called()
+
+    @conf_vars({("openlineage", "execute_in_thread"): "True"})
+    
@patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._thread_execute")
+    
@patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._fork_execute")
+    def test_execute_runs_inline_without_fork_flag(self, mock_fork, 
mock_thread):
+        # use_fork=False always runs the callable directly, regardless of 
execute_in_thread.
+        listener = OpenLineageListener()
+        callable_ = MagicMock()
+
+        listener._execute(callable_, "on_state_change", use_fork=False)
+
+        callable_.assert_called_once_with()
+        mock_fork.assert_not_called()
+        mock_thread.assert_not_called()
+
+    @conf_vars({("openlineage", "execute_in_thread"): "True", ("openlineage", 
"execution_timeout"): "5"})
+    def test_thread_execute_runs_callable_to_completion(self):
+        listener = OpenLineageListener()
+        result = {"ran": False}
+
+        def _emit():
+            result["ran"] = True
+
+        listener._thread_execute(_emit, "on_running")
+
+        assert result["ran"] is True
+
+    @conf_vars({("openlineage", "execute_in_thread"): "True", ("openlineage", 
"execution_timeout"): "1"})
+    def test_thread_execute_is_bounded_and_abandons_overrunning_emission(self):
+        import threading
+        import time
+
+        listener = OpenLineageListener()
+        started = threading.Event()
+        release = threading.Event()
+        finished = {"v": False}
+
+        def _slow_emit():
+            started.set()
+            # Block far longer than execution_timeout; released by the test 
for cleanup.
+            release.wait(timeout=30)
+            finished["v"] = True
+
+        try:
+            t0 = time.monotonic()
+            listener._thread_execute(_slow_emit, "on_running")
+            elapsed = time.monotonic() - t0
+
+            # The emission started, but `_thread_execute` returned without 
waiting for it:
+            # bounded by execution_timeout (1s), never blocking for the full 
30s emission.
+            assert started.is_set()
+            assert finished["v"] is False
+            assert elapsed < 10
+        finally:
+            release.set()
diff --git a/providers/openlineage/tests/unit/openlineage/test_conf.py 
b/providers/openlineage/tests/unit/openlineage/test_conf.py
index 5ee497a6151..61e32d46d1b 100644
--- a/providers/openlineage/tests/unit/openlineage/test_conf.py
+++ b/providers/openlineage/tests/unit/openlineage/test_conf.py
@@ -30,6 +30,7 @@ from airflow.providers.openlineage.conf import (
     dag_state_change_process_pool_size,
     debug_mode,
     disabled_operators,
+    execute_in_thread,
     execution_timeout,
     include_full_task_info,
     is_disabled,
@@ -63,6 +64,7 @@ _CONFIG_OPTION_SELECTIVE_ENABLE = "selective_enable"
 _CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE = 
"dag_state_change_process_pool_size"
 _CONFIG_OPTION_INCLUDE_FULL_TASK_INFO = "include_full_task_info"
 _CONFIG_OPTION_DEBUG_MODE = "debug_mode"
+_CONFIG_OPTION_EXECUTE_IN_THREAD = "execute_in_thread"
 _CONFIG_OPTION_SPARK_INJECT_PARENT_JOB_INFO = "spark_inject_parent_job_info"
 _CONFIG_OPTION_SPARK_INJECT_TRANSPORT_INFO = "spark_inject_transport_info"
 
@@ -629,6 +631,21 @@ def test_debug_mode(var_string, expected):
         assert result is expected
 
 
[email protected](
+    ("var_string", "expected"),
+    _BOOL_PARAMS,
+)
+def test_execute_in_thread(var_string, expected):
+    with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_EXECUTE_IN_THREAD): 
var_string}):
+        result = execute_in_thread()
+        assert result is expected
+
+
+@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_EXECUTE_IN_THREAD): None})
+def test_execute_in_thread_do_not_fail_if_conf_option_missing():
+    assert execute_in_thread() is False
+
+
 @pytest.mark.parametrize(
     "var_string",
     (

Reply via email to