This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch openlineage-do-not-submit-busyjobs
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3c173d596bfa17b8432e5c46a9624cbf8f1471e8
Author: Maciej Obuchowski <obuchowski.mac...@gmail.com>
AuthorDate: Thu Apr 4 21:01:23 2024 +0200

    openlineage: skip sending events if the executor is busy
    
    Signed-off-by: Maciej Obuchowski <obuchowski.mac...@gmail.com>
---
 airflow/providers/openlineage/plugins/listener.py | 24 +++++++++++++++++++----
 1 file changed, 20 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 25ded6d7f4..a839b83cd7 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -17,9 +17,10 @@
 from __future__ import annotations
 
 import logging
+import threading
 from concurrent.futures import ThreadPoolExecutor
 from datetime import datetime
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Callable
 
 from openlineage.client.serde import Serde
 
@@ -250,7 +251,9 @@ class OpenLineageListener:
     @property
     def executor(self):
         if not self._executor:
-            self._executor = ThreadPoolExecutor(max_workers=8, 
thread_name_prefix="openlineage_")
+            max_workers = 8
+            self._executor = ThreadPoolExecutor(max_workers=max_workers, 
thread_name_prefix="openlineage_")
+            self._busy_semaphore = threading.Semaphore(max_workers)
         return self._executor
 
     @hookimpl
@@ -277,6 +280,19 @@ class OpenLineageListener:
             nominal_end_time=data_interval_end,
         )
 
+    def executor_submit(self, callable: Callable, *args, **kwargs):
+        def execute():
+            try:
+                callable(*args, **kwargs)
+            finally:
+                self._busy_semaphore.release()
+
+        acquired = self._busy_semaphore.acquire(blocking=False)
+        if not acquired:
+            self.log.debug("Executor is busy; skipping sending OpenLineage 
event")
+            return
+        self.executor.submit(execute)
+
     @hookimpl
     def on_dag_run_success(self, dag_run: DagRun, msg: str):
         if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
@@ -284,7 +300,7 @@ class OpenLineageListener:
         if not self.executor:
             self.log.debug("Executor have not started before 
`on_dag_run_success`")
             return
-        self.executor.submit(self.adapter.dag_success, dag_run=dag_run, 
msg=msg)
+        self.executor_submit(self.adapter.dag_success, dag_run=dag_run, 
msg=msg)
 
     @hookimpl
     def on_dag_run_failed(self, dag_run: DagRun, msg: str):
@@ -293,7 +309,7 @@ class OpenLineageListener:
         if not self.executor:
             self.log.debug("Executor have not started before 
`on_dag_run_failed`")
             return
-        self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)
+        self.executor_submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)
 
 
 def get_openlineage_listener() -> OpenLineageListener:

Reply via email to