This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a35ec9549d Fixes to how DebugExecutor handles sensors (#28528)
a35ec9549d is described below

commit a35ec9549dc0b4b311f126f3022309ec5b33fa62
Author: Raphaƫl Vandon <114772123+vandonr-...@users.noreply.github.com>
AuthorDate: Tue Jan 24 21:40:10 2023 -0800

    Fixes to how DebugExecutor handles sensors (#28528)
    
    * move fix to ready_to_reschedule
    * replace check on debug exec with new property
    
    Co-authored-by: Tzu-ping Chung <uranu...@gmail.com>
---
 airflow/executors/debug_executor.py         |  6 ++++++
 airflow/ti_deps/deps/ready_to_reschedule.py | 15 +++++++++++----
 2 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/airflow/executors/debug_executor.py 
b/airflow/executors/debug_executor.py
index 18ba0f8798..4355bd1dcf 100644
--- a/airflow/executors/debug_executor.py
+++ b/airflow/executors/debug_executor.py
@@ -25,6 +25,7 @@ DebugExecutor.
 from __future__ import annotations
 
 import threading
+import time
 from typing import Any
 
 from airflow.configuration import conf
@@ -120,6 +121,11 @@ class DebugExecutor(BaseExecutor):
 
         :param open_slots: Number of open slots
         """
+        if not self.queued_tasks:
+            # wait a bit if there are no tasks ready to be executed to avoid 
spinning too fast in the void
+            time.sleep(0.5)
+            return
+
         sorted_queue = sorted(
             self.queued_tasks.items(),
             key=lambda x: x[1][1],
diff --git a/airflow/ti_deps/deps/ready_to_reschedule.py 
b/airflow/ti_deps/deps/ready_to_reschedule.py
index 467f08536e..66aa5c5613 100644
--- a/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from airflow.executors.executor_loader import ExecutorLoader
 from airflow.models.taskreschedule import TaskReschedule
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
 from airflow.utils import timezone
@@ -44,10 +45,16 @@ class ReadyToRescheduleDep(BaseTIDep):
         from airflow.models.mappedoperator import MappedOperator
 
         is_mapped = isinstance(ti.task, MappedOperator)
-        if not is_mapped and not getattr(ti.task, "reschedule", False):
-            # Mapped sensors don't have the reschedule property (it can only
-            # be calculated after unmapping), so we don't check them here.
-            # They are handled below by checking TaskReschedule instead.
+        executor, _ = ExecutorLoader.import_default_executor_cls()
+        if (
+            # Mapped sensors don't have the reschedule property (it can only 
be calculated after unmapping),
+            # so we don't check them here. They are handled below by checking 
TaskReschedule instead.
+            not is_mapped
+            and not getattr(ti.task, "reschedule", False)
+            # Executors can force running in reschedule mode,
+            # in which case we ignore the value of the task property.
+            and not executor.change_sensor_mode_to_reschedule
+        ):
             yield self._passing_status(reason="Task is not in reschedule 
mode.")
             return
 

Reply via email to