This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fbcee8d01b fix: scheduler crashing with OL provider on airflow 
standalone (#40353)
fbcee8d01b is described below

commit fbcee8d01bddd100d9335404796a40247a6c6487
Author: Kacper Muda <mudakac...@gmail.com>
AuthorDate: Fri Jun 21 15:46:10 2024 +0200

    fix: scheduler crashing with OL provider on airflow standalone (#40353)
    
    Signed-off-by: Kacper Muda <mudakac...@gmail.com>
---
 airflow/providers/openlineage/plugins/listener.py | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 12ebe7c6e6..43553e8ba9 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -62,6 +62,18 @@ def _get_try_number_success(val):
     return val.try_number - 1
 
 
+def _executor_initializer():
+    """
+    Initialize worker processes for the executor used for DagRun listener.
+
+    This function must be picklable, so it cannot be defined as an inner 
method or local function.
+
+    Reconfigures the ORM engine to prevent issues that arise when multiple 
processes interact with
+    the Airflow database.
+    """
+    settings.configure_orm()
+
+
 class OpenLineageListener:
     """OpenLineage listener sends events on task instance and dag run starts, 
completes and failures."""
 
@@ -366,16 +378,10 @@ class OpenLineageListener:
 
     @property
     def executor(self) -> ProcessPoolExecutor:
-        # Executor for dag_run listener
-        def initializer():
-            # Re-configure the ORM engine as there are issues with multiple 
processes
-            # if process calls Airflow DB.
-            settings.configure_orm()
-
         if not self._executor:
             self._executor = ProcessPoolExecutor(
                 max_workers=conf.dag_state_change_process_pool_size(),
-                initializer=initializer,
+                initializer=_executor_initializer(),
             )
         return self._executor
 

Reply via email to