This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new fbcee8d01b fix: scheduler crashing with OL provider on airflow standalone (#40353) fbcee8d01b is described below commit fbcee8d01bddd100d9335404796a40247a6c6487 Author: Kacper Muda <mudakac...@gmail.com> AuthorDate: Fri Jun 21 15:46:10 2024 +0200 fix: scheduler crashing with OL provider on airflow standalone (#40353) Signed-off-by: Kacper Muda <mudakac...@gmail.com> --- airflow/providers/openlineage/plugins/listener.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 12ebe7c6e6..43553e8ba9 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -62,6 +62,18 @@ def _get_try_number_success(val): return val.try_number - 1 +def _executor_initializer(): + """ + Initialize worker processes for the executor used for DagRun listener. + + This function must be picklable, so it cannot be defined as an inner method or local function. + + Reconfigures the ORM engine to prevent issues that arise when multiple processes interact with + the Airflow database. + """ + settings.configure_orm() + + class OpenLineageListener: """OpenLineage listener sends events on task instance and dag run starts, completes and failures.""" @@ -366,16 +378,10 @@ class OpenLineageListener: @property def executor(self) -> ProcessPoolExecutor: - # Executor for dag_run listener - def initializer(): - # Re-configure the ORM engine as there are issues with multiple processes - # if process calls Airflow DB. - settings.configure_orm() - if not self._executor: self._executor = ProcessPoolExecutor( max_workers=conf.dag_state_change_process_pool_size(), - initializer=initializer, + initializer=_executor_initializer(), ) return self._executor