This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 1c56f9dd035 [v2-10-test] Fixed the endless reschedule (#45224) (#45250)
1c56f9dd035 is described below

commit 1c56f9dd03534eadec564603c72f02548831fa55
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri Dec 27 23:50:56 2024 +0100

    [v2-10-test] Fixed the endless reschedule (#45224) (#45250)
    
    (cherry picked from commit 7f2b8ef5acf95b7fb8faa38a10caf94c043f5019)
    
    Co-authored-by: morooshka <[email protected]>
---
 airflow/sensors/base.py    |   7 +-
 tests/sensors/test_base.py | 285 ++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 288 insertions(+), 4 deletions(-)

diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py
index 7df76fae528..d6e1a6d8fc7 100644
--- a/airflow/sensors/base.py
+++ b/airflow/sensors/base.py
@@ -106,7 +106,10 @@ def _orig_start_date(
             TaskReschedule.task_id == task_id,
             TaskReschedule.run_id == run_id,
             TaskReschedule.map_index == map_index,
-            TaskReschedule.try_number == try_number,
+            # If the first try's record was not saved due to the Exception 
occurred and the following
+            # transaction rollback, the next available attempt should be taken
+            # to prevent falling in the endless rescheduling
+            TaskReschedule.try_number >= try_number,
         )
         .order_by(TaskReschedule.id.asc())
         .with_only_columns(TaskReschedule.start_date)
@@ -253,7 +256,7 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
             max_tries: int = ti.max_tries or 0
             retries: int = self.retries or 0
             # If reschedule, use the start date of the first try (first try 
can be either the very
-            # first execution of the task, or the first execution after the 
task was cleared.)
+            # first execution of the task, or the first execution after the 
task was cleared).
             first_try_number = max_tries - retries + 1
             start_date = _orig_start_date(
                 dag_id=ti.dag_id,
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index 43c8fa5a64b..c227ccbf2cf 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -81,7 +81,7 @@ def task_reschedules_for_ti():
 
 
 class DummySensor(BaseSensorOperator):
-    def __init__(self, return_value=False, **kwargs):
+    def __init__(self, return_value: bool | None = False, **kwargs):
         super().__init__(**kwargs)
         self.return_value = return_value
 
@@ -421,7 +421,7 @@ class TestBaseSensor:
         assert task_reschedules[0].try_number == 1
         assert dummy_ti.state == State.NONE
 
-        # second poke timesout and task instance is failed
+        # second poke times out and task instance is failed
         time_machine.coordinates.shift(sensor.poke_interval)
         with pytest.raises(AirflowSensorTimeout):
             self._run(sensor)
@@ -877,6 +877,287 @@ class TestBaseSensor:
         assert sensor_ti.max_tries == 4
         assert sensor_ti.state == State.FAILED
 
+    def test_reschedule_set_retries_no_errors_timeout_fail(
+        self, make_sensor, time_machine, session, task_reschedules_for_ti
+    ):
+        """
+        Mode "reschedule", retries set, but no errors occurred, time gone out.
+        Retries and timeout configurations interact correctly.
+
+        Given a sensor configured: poke_interval=5 timeout=10 retries=2 
retry_delay=timedelta(seconds=7)
+        If no errors occurred, no retries should be executed.
+        This is how it is expected to behave:
+        1. 00:00 Returns False: try_number=1 max_tries=2 
state=UP_FOR_RESCHEDULE
+        2. 00:05 Returns False: try_number=1 max_tries=2 
state=UP_FOR_RESCHEDULE
+        3. 00:10 Returns False: try_number=1 max_tries=2 
state=UP_FOR_RESCHEDULE
+        4. 00:15 Raises AirflowSensorTimeout: try_number=1 max_tries=2 
state=FAILED
+        """
+        sensor, dr = make_sensor(
+            return_value=None,
+            poke_interval=5,
+            timeout=10,
+            retries=2,
+            retry_delay=timedelta(seconds=3),
+            mode="reschedule",
+            silent_fail=False,
+        )
+
+        def _get_sensor_ti():
+            return next(x for x in dr.get_task_instances(session=session) if 
x.task_id == SENSOR_OP)
+
+        sensor.poke = Mock(side_effect=[False, False, False, False])
+
+        # Scheduler does this before the first run
+        sensor_ti = _get_sensor_ti()
+        sensor_ti.state = State.SCHEDULED
+        sensor_ti.try_number += 1
+        session.commit()
+
+        # 1-st poke
+        date1 = timezone.utcnow()
+        time_machine.move_to(date1, tick=False)
+        self._run(sensor)
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 1
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 1
+
+        # 2-nd poke
+        time_machine.coordinates.shift(sensor.poke_interval)
+        self._run(sensor)
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 1
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 2
+
+        # 3-rd poke
+        time_machine.coordinates.shift(sensor.poke_interval)
+        self._run(sensor)
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 1
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 3
+
+        # 4-th poke causes timeout
+        time_machine.coordinates.shift(sensor.poke_interval)
+
+        with pytest.raises(AirflowSensorTimeout):
+            self._run(sensor)
+
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 1
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.FAILED
+
+        # On failed by timeout poke reschedule attempt is not saved
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 3
+
+    def test_reschedule_set_retries_1st_poke_runtime_error_timeout_fail(
+        self, make_sensor, time_machine, session, task_reschedules_for_ti
+    ):
+        """
+        Mode "reschedule", retries set, fist poke causes RuntimeError, time 
gone out.
+        RuntimeError on first iteration results in no reschedule attempt 
recorded in DB.
+        In that case sensor should not result in endless loop and should 
calculate running
+        timeout from:
+        1. The current system date on the second poke
+        2. From the second attempt on subsequent attempts
+        See issue #45050 https://github.com/apache/airflow/issues/45050
+        Retries and timeout configurations interact correctly.
+
+        Given a sensor configured: poke_interval=5 timeout=10 retries=2 
retry_delay=timedelta(seconds=7)
+        Number of retries incremented on error, but run timeout should not be 
extended.
+        This is how it is expected to behave:
+        1. 00:00 Raises RuntimeError: try_number=1 max_tries=2 
state=UP_FOR_RETRY
+        2. 00:07 Returns False: try_number=2 max_tries=2 
state=UP_FOR_RESCHEDULE
+           This is a first saved reschedule attempt, and it is used to 
calculate the run duration
+        3. 00:12 Returns False: try_number=2 max_tries=2 
state=UP_FOR_RESCHEDULE
+        4. 00:17 Returns False: try_number=2 max_tries=2 
state=UP_FOR_RESCHEDULE
+        5. 00:23 Raises AirflowSensorTimeout: try_number=2 max_tries=2 
state=FAILED
+        """
+        sensor, dr = make_sensor(
+            return_value=None,
+            poke_interval=5,
+            timeout=10,
+            retries=2,
+            retry_delay=timedelta(seconds=7),
+            mode="reschedule",
+            silent_fail=False,
+        )
+
+        def _get_sensor_ti():
+            return next(x for x in dr.get_task_instances(session=session) if 
x.task_id == SENSOR_OP)
+
+        sensor.poke = Mock(side_effect=[RuntimeError, False, False, False, 
False])
+
+        # Scheduler does this before the first run
+        sensor_ti = _get_sensor_ti()
+        sensor_ti.state = State.SCHEDULED
+        sensor_ti.try_number += 1
+        session.commit()
+
+        # 1-st poke
+        date1 = timezone.utcnow()
+        time_machine.move_to(date1, tick=False)
+
+        with pytest.raises(RuntimeError):
+            self._run(sensor)
+
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 1
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.UP_FOR_RETRY
+
+        # On runtime error no reschedule attempt saved
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 0
+
+        # Scheduler does this before retry
+        sensor_ti = _get_sensor_ti()
+        sensor_ti.try_number += 1
+        session.commit()
+
+        # 2-nd poke
+        time_machine.coordinates.shift(sensor.retry_delay + 
timedelta(seconds=1))
+        self._run(sensor)
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 2
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 1
+
+        # 3-rd poke
+        time_machine.coordinates.shift(sensor.poke_interval)
+        self._run(sensor)
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 2
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 2
+
+        # 4-th poke
+        time_machine.coordinates.shift(sensor.poke_interval)
+        self._run(sensor)
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 2
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 3
+
+        # 5-th poke causes timeout
+        time_machine.coordinates.shift(sensor.poke_interval)
+
+        with pytest.raises(AirflowSensorTimeout):
+            self._run(sensor)
+
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 2
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.FAILED
+
+        # On failed by timeout poke reschedule attempt is not saved
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 3
+
+    def test_reschedule_set_retries_2nd_poke_runtime_error_timeout_fail(
+        self, make_sensor, time_machine, session, task_reschedules_for_ti
+    ):
+        """
+        Mode "reschedule", retries set, fist poke causes RuntimeError, time 
gone out.
+        RuntimeError on second iteration results in no reschedule for the 
second attempt
+        recorded in DB.
+        In that case running timeout calculation should not be affected 
because sensor
+        should use the first attempt as the start of execution.
+        Retries and timeout configurations interact correctly.
+
+        Given a sensor configured: poke_interval=5 timeout=10 retries=2 
retry_delay=timedelta(seconds=7)
+        Number of retries incremented on error, but run timeout should not be 
extended.
+        This is how it is expected to behave:
+        00:00 Returns False: try_number=1 max_tries=2 state=UP_FOR_RESCHEDULE
+        00:05 Raises RuntimeError: try_number=1 max_tries=2 state=UP_FOR_RETRY
+        00:12 Raises AirflowSensorTimeout: try_number=2 max_tries=2 
state=FAILED
+
+        """
+        sensor, dr = make_sensor(
+            return_value=None,
+            poke_interval=5,
+            timeout=10,
+            retries=2,
+            retry_delay=timedelta(seconds=7),
+            mode="reschedule",
+            silent_fail=False,
+        )
+
+        def _get_sensor_ti():
+            return next(x for x in dr.get_task_instances(session=session) if 
x.task_id == SENSOR_OP)
+
+        sensor.poke = Mock(side_effect=[False, RuntimeError, False])
+
+        # Scheduler does this before the first run
+        sensor_ti = _get_sensor_ti()
+        sensor_ti.state = State.SCHEDULED
+        sensor_ti.try_number += 1
+        session.commit()
+
+        # 1-st poke
+        print("**1" * 40)
+        date1 = timezone.utcnow()
+        time_machine.move_to(date1, tick=False)
+        self._run(sensor)
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 1
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 1
+
+        # 2-nd poke
+        print("**2" * 40)
+        time_machine.coordinates.shift(sensor.poke_interval)
+
+        with pytest.raises(RuntimeError):
+            self._run(sensor)
+
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 1
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.UP_FOR_RETRY
+
+        # On runtime error no reschedule attempt saved
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 1
+
+        # Scheduler does this before retry
+        sensor_ti = _get_sensor_ti()
+        sensor_ti.try_number += 1
+        session.commit()
+
+        # 3-rd poke causes timeout
+        print("*3*" * 40)
+        time_machine.coordinates.shift(sensor.retry_delay + 
timedelta(seconds=1))
+
+        with pytest.raises(AirflowSensorTimeout):
+            self._run(sensor)
+
+        sensor_ti = _get_sensor_ti()
+        assert sensor_ti.try_number == 2
+        assert sensor_ti.max_tries == 2
+        assert sensor_ti.state == State.FAILED
+
+        # On failed by timeout poke reschedule attempt is not saved
+        task_reschedules = task_reschedules_for_ti(sensor_ti)
+        assert len(task_reschedules) == 0
+
     def test_reschedule_and_retry_timeout_and_silent_fail(self, make_sensor, 
time_machine, session):
         """
         Test mode="reschedule", silent_fail=True then retries and timeout 
configurations interact correctly.

Reply via email to