This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-do-not-submit-busyjobs in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3c173d596bfa17b8432e5c46a9624cbf8f1471e8 Author: Maciej Obuchowski <obuchowski.mac...@gmail.com> AuthorDate: Thu Apr 4 21:01:23 2024 +0200 openlineage: skip sending events if the executor is busy Signed-off-by: Maciej Obuchowski <obuchowski.mac...@gmail.com> --- airflow/providers/openlineage/plugins/listener.py | 24 +++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 25ded6d7f4..a839b83cd7 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -17,9 +17,10 @@ from __future__ import annotations import logging +import threading from concurrent.futures import ThreadPoolExecutor from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Callable from openlineage.client.serde import Serde @@ -250,7 +251,9 @@ class OpenLineageListener: @property def executor(self): if not self._executor: - self._executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="openlineage_") + max_workers = 8 + self._executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="openlineage_") + self._busy_semaphore = threading.Semaphore(max_workers) return self._executor @hookimpl @@ -277,6 +280,19 @@ class OpenLineageListener: nominal_end_time=data_interval_end, ) + def executor_submit(self, callable: Callable, *args, **kwargs): + def execute(): + try: + callable(*args, **kwargs) + finally: + self._busy_semaphore.release() + + acquired = self._busy_semaphore.acquire(blocking=False) + if not acquired: + self.log.debug("Executor is busy; skipping sending OpenLineage event") + return + self.executor.submit(execute) + @hookimpl def on_dag_run_success(self, dag_run: DagRun, msg: str): if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag): @@ -284,7 +300,7 @@ class OpenLineageListener: if not self.executor: self.log.debug("Executor have not started before `on_dag_run_success`") return - self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg) + self.executor_submit(self.adapter.dag_success, dag_run=dag_run, msg=msg) @hookimpl def on_dag_run_failed(self, dag_run: DagRun, msg: str): @@ -293,7 +309,7 @@ class OpenLineageListener: if not self.executor: self.log.debug("Executor have not started before `on_dag_run_failed`") return - self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg) + self.executor_submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg) def get_openlineage_listener() -> OpenLineageListener: