This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d1b1f74ab7c fix(openlineage): self-heal ProcessPoolExecutor on
BrokenProcessPool (#67400)
d1b1f74ab7c is described below
commit d1b1f74ab7c5f03b87e397d469dda797d59fac0b
Author: Anmol Mishra <[email protected]>
AuthorDate: Sun May 31 03:36:28 2026 +0530
fix(openlineage): self-heal ProcessPoolExecutor on BrokenProcessPool
(#67400)
* fix(openlineage): self-heal ProcessPoolExecutor on BrokenProcessPool
When a child process in the OpenLineage listener's ProcessPoolExecutor
terminates abruptly, concurrent.futures marks the pool as permanently
broken. Every subsequent submission raises BrokenProcessPool and lineage
data stops flowing until the scheduler is restarted.
This adds self-healing: submit_callable now catches BrokenProcessPool,
shuts down the broken executor, creates a fresh one, and retries the
submission so lineage reporting recovers automatically.
Closes #67283
* fix(openlineage): sort imports to fix ruff I001 static check
ruff I001: `from concurrent.futures.process import BrokenProcessPool`
must follow `from concurrent.futures import ProcessPoolExecutor`
* fix(openlineage): collapse log.warning to single line for ruff-format
The warning message fits within the line-length limit so it should not be
split across three lines.
---------
Co-authored-by: Anmol Mishra <[email protected]>
---
.../providers/openlineage/plugins/listener.py | 9 +++++-
.../unit/openlineage/plugins/test_listener.py | 32 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index 0f4b0f3ca23..2d4d74e828f 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -20,6 +20,7 @@ import logging
import os
import sys
from concurrent.futures import ProcessPoolExecutor
+from concurrent.futures.process import BrokenProcessPool
from datetime import datetime
from functools import cache
from typing import TYPE_CHECKING
@@ -1049,7 +1050,13 @@ class OpenLineageListener:
self.log.warning("OpenLineage received exception in method
on_dag_run_failed", exc_info=e)
def submit_callable(self, callable, *args, **kwargs):
- fut = self.executor.submit(callable, *args, **kwargs)
+ try:
+ fut = self.executor.submit(callable, *args, **kwargs)
+ except BrokenProcessPool:
+ self.log.warning("ProcessPoolExecutor is broken; recreating and
retrying submission.")
+ self._executor.shutdown(wait=False)
+ self._executor = None
+ fut = self.executor.submit(callable, *args, **kwargs)
fut.add_done_callback(self.log_submit_error)
return fut
diff --git
a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
index 25bee70ce75..edec29202c0 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
@@ -2128,6 +2128,38 @@ class TestOpenLineageListenerAirflow3:
listener.log.debug.assert_not_called()
listener.log.warning.assert_called_once()
+ def test_submit_callable_recreates_executor_on_broken_pool(self):
+ """When a child process dies and BrokenProcessPool is raised, the
+ listener should shut down the broken executor, create a fresh one, and
+ retry the submission."""
+ from concurrent.futures.process import BrokenProcessPool
+
+ listener = OpenLineageListener()
+ broken_executor = MagicMock()
+ broken_executor.submit.side_effect = BrokenProcessPool()
+ new_executor = MagicMock()
+ new_future = MagicMock()
+ new_executor.submit.return_value = new_future
+
+ listener._executor = broken_executor
+ listener.log = MagicMock()
+
+ def dummy_callable():
+ pass
+
+ with mock.patch(
+
"airflow.providers.openlineage.plugins.listener.ProcessPoolExecutor",
+ return_value=new_executor,
+ ):
+ fut = listener.submit_callable(dummy_callable, "arg1",
kwarg1="val1")
+
+ broken_executor.shutdown.assert_called_once_with(wait=False)
+ new_executor.submit.assert_called_once_with(dummy_callable, "arg1",
kwarg1="val1")
+
new_future.add_done_callback.assert_called_once_with(listener.log_submit_error)
+ assert fut is new_future
+ listener.log.warning.assert_called_once()
+ assert "recreating" in listener.log.warning.call_args[0][0]
+
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 2 tests")
class TestOpenLineageSelectiveEnableAirflow2: