This is an automated email from the ASF dual-hosted git repository. onikolas pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new a35ec9549d Fixes to how DebugExecutor handles sensors (#28528) a35ec9549d is described below commit a35ec9549dc0b4b311f126f3022309ec5b33fa62 Author: Raphaƫl Vandon <114772123+vandonr-...@users.noreply.github.com> AuthorDate: Tue Jan 24 21:40:10 2023 -0800 Fixes to how DebugExecutor handles sensors (#28528) * move fix to ready_to_reschedule * replace check on debug exec with new property Co-authored-by: Tzu-ping Chung <uranu...@gmail.com> --- airflow/executors/debug_executor.py | 6 ++++++ airflow/ti_deps/deps/ready_to_reschedule.py | 15 +++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index 18ba0f8798..4355bd1dcf 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -25,6 +25,7 @@ DebugExecutor. from __future__ import annotations import threading +import time from typing import Any from airflow.configuration import conf @@ -120,6 +121,11 @@ class DebugExecutor(BaseExecutor): :param open_slots: Number of open slots """ + if not self.queued_tasks: + # wait a bit if there are no tasks ready to be executed to avoid spinning too fast in the void + time.sleep(0.5) + return + sorted_queue = sorted( self.queued_tasks.items(), key=lambda x: x[1][1], diff --git a/airflow/ti_deps/deps/ready_to_reschedule.py b/airflow/ti_deps/deps/ready_to_reschedule.py index 467f08536e..66aa5c5613 100644 --- a/airflow/ti_deps/deps/ready_to_reschedule.py +++ b/airflow/ti_deps/deps/ready_to_reschedule.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +from airflow.executors.executor_loader import ExecutorLoader from airflow.models.taskreschedule import TaskReschedule from airflow.ti_deps.deps.base_ti_dep import BaseTIDep from airflow.utils import timezone @@ -44,10 +45,16 @@ class ReadyToRescheduleDep(BaseTIDep): from airflow.models.mappedoperator import MappedOperator is_mapped = isinstance(ti.task, MappedOperator) - if not is_mapped and not getattr(ti.task, "reschedule", False): - # Mapped sensors don't have the reschedule property (it can only - # be calculated after unmapping), so we don't check them here. - # They are handled below by checking TaskReschedule instead. + executor, _ = ExecutorLoader.import_default_executor_cls() + if ( + # Mapped sensors don't have the reschedule property (it can only be calculated after unmapping), + # so we don't check them here. They are handled below by checking TaskReschedule instead. + not is_mapped + and not getattr(ti.task, "reschedule", False) + # Executors can force running in reschedule mode, + # in which case we ignore the value of the task property. + and not executor.change_sensor_mode_to_reschedule + ): yield self._passing_status(reason="Task is not in reschedule mode.") return