This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 1c56f9dd035 [v2-10-test] Fixed the endless reschedule (#45224) (#45250)
1c56f9dd035 is described below
commit 1c56f9dd03534eadec564603c72f02548831fa55
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri Dec 27 23:50:56 2024 +0100
[v2-10-test] Fixed the endless reschedule (#45224) (#45250)
(cherry picked from commit 7f2b8ef5acf95b7fb8faa38a10caf94c043f5019)
Co-authored-by: morooshka <[email protected]>
---
airflow/sensors/base.py | 7 +-
tests/sensors/test_base.py | 285 ++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 288 insertions(+), 4 deletions(-)
diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py
index 7df76fae528..d6e1a6d8fc7 100644
--- a/airflow/sensors/base.py
+++ b/airflow/sensors/base.py
@@ -106,7 +106,10 @@ def _orig_start_date(
TaskReschedule.task_id == task_id,
TaskReschedule.run_id == run_id,
TaskReschedule.map_index == map_index,
- TaskReschedule.try_number == try_number,
+ # If the first try's record was not saved due to the Exception
occurred and the following
+ # transaction rollback, the next available attempt should be taken
+ # to prevent falling in the endless rescheduling
+ TaskReschedule.try_number >= try_number,
)
.order_by(TaskReschedule.id.asc())
.with_only_columns(TaskReschedule.start_date)
@@ -253,7 +256,7 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
max_tries: int = ti.max_tries or 0
retries: int = self.retries or 0
# If reschedule, use the start date of the first try (first try
can be either the very
- # first execution of the task, or the first execution after the
task was cleared.)
+ # first execution of the task, or the first execution after the
task was cleared).
first_try_number = max_tries - retries + 1
start_date = _orig_start_date(
dag_id=ti.dag_id,
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index 43c8fa5a64b..c227ccbf2cf 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -81,7 +81,7 @@ def task_reschedules_for_ti():
class DummySensor(BaseSensorOperator):
- def __init__(self, return_value=False, **kwargs):
+ def __init__(self, return_value: bool | None = False, **kwargs):
super().__init__(**kwargs)
self.return_value = return_value
@@ -421,7 +421,7 @@ class TestBaseSensor:
assert task_reschedules[0].try_number == 1
assert dummy_ti.state == State.NONE
- # second poke timesout and task instance is failed
+ # second poke times out and task instance is failed
time_machine.coordinates.shift(sensor.poke_interval)
with pytest.raises(AirflowSensorTimeout):
self._run(sensor)
@@ -877,6 +877,287 @@ class TestBaseSensor:
assert sensor_ti.max_tries == 4
assert sensor_ti.state == State.FAILED
+ def test_reschedule_set_retries_no_errors_timeout_fail(
+ self, make_sensor, time_machine, session, task_reschedules_for_ti
+ ):
+ """
+ Mode "reschedule", retries set, but no errors occurred, time gone out.
+ Retries and timeout configurations interact correctly.
+
+ Given a sensor configured: poke_interval=5 timeout=10 retries=2
retry_delay=timedelta(seconds=7)
+ If no errors occurred, no retries should be executed.
+ This is how it is expected to behave:
+ 1. 00:00 Returns False: try_number=1 max_tries=2
state=UP_FOR_RESCHEDULE
+ 2. 00:05 Returns False: try_number=1 max_tries=2
state=UP_FOR_RESCHEDULE
+ 3. 00:10 Returns False: try_number=1 max_tries=2
state=UP_FOR_RESCHEDULE
+ 4. 00:15 Raises AirflowSensorTimeout: try_number=1 max_tries=2
state=FAILED
+ """
+ sensor, dr = make_sensor(
+ return_value=None,
+ poke_interval=5,
+ timeout=10,
+ retries=2,
+ retry_delay=timedelta(seconds=3),
+ mode="reschedule",
+ silent_fail=False,
+ )
+
+ def _get_sensor_ti():
+ return next(x for x in dr.get_task_instances(session=session) if
x.task_id == SENSOR_OP)
+
+ sensor.poke = Mock(side_effect=[False, False, False, False])
+
+ # Scheduler does this before the first run
+ sensor_ti = _get_sensor_ti()
+ sensor_ti.state = State.SCHEDULED
+ sensor_ti.try_number += 1
+ session.commit()
+
+ # 1-st poke
+ date1 = timezone.utcnow()
+ time_machine.move_to(date1, tick=False)
+ self._run(sensor)
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 1
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 1
+
+ # 2-nd poke
+ time_machine.coordinates.shift(sensor.poke_interval)
+ self._run(sensor)
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 1
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 2
+
+ # 3-rd poke
+ time_machine.coordinates.shift(sensor.poke_interval)
+ self._run(sensor)
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 1
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 3
+
+ # 4-th poke causes timeout
+ time_machine.coordinates.shift(sensor.poke_interval)
+
+ with pytest.raises(AirflowSensorTimeout):
+ self._run(sensor)
+
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 1
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.FAILED
+
+ # On failed by timeout poke reschedule attempt is not saved
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 3
+
+ def test_reschedule_set_retries_1st_poke_runtime_error_timeout_fail(
+ self, make_sensor, time_machine, session, task_reschedules_for_ti
+ ):
+ """
+ Mode "reschedule", retries set, fist poke causes RuntimeError, time
gone out.
+ RuntimeError on first iteration results in no reschedule attempt
recorded in DB.
+ In that case sensor should not result in endless loop and should
calculate running
+ timeout from:
+ 1. The current system date on the second poke
+ 2. From the second attempt on subsequent attempts
+ See issue #45050 https://github.com/apache/airflow/issues/45050
+ Retries and timeout configurations interact correctly.
+
+ Given a sensor configured: poke_interval=5 timeout=10 retries=2
retry_delay=timedelta(seconds=7)
+ Number of retries incremented on error, but run timeout should not be
extended.
+ This is how it is expected to behave:
+ 1. 00:00 Raises RuntimeError: try_number=1 max_tries=2
state=UP_FOR_RETRY
+ 2. 00:07 Returns False: try_number=2 max_tries=2
state=UP_FOR_RESCHEDULE
+ This is a first saved reschedule attempt, and it is used to
calculate the run duration
+ 3. 00:12 Returns False: try_number=2 max_tries=2
state=UP_FOR_RESCHEDULE
+ 4. 00:17 Returns False: try_number=2 max_tries=2
state=UP_FOR_RESCHEDULE
+ 5. 00:23 Raises AirflowSensorTimeout: try_number=2 max_tries=2
state=FAILED
+ """
+ sensor, dr = make_sensor(
+ return_value=None,
+ poke_interval=5,
+ timeout=10,
+ retries=2,
+ retry_delay=timedelta(seconds=7),
+ mode="reschedule",
+ silent_fail=False,
+ )
+
+ def _get_sensor_ti():
+ return next(x for x in dr.get_task_instances(session=session) if
x.task_id == SENSOR_OP)
+
+ sensor.poke = Mock(side_effect=[RuntimeError, False, False, False,
False])
+
+ # Scheduler does this before the first run
+ sensor_ti = _get_sensor_ti()
+ sensor_ti.state = State.SCHEDULED
+ sensor_ti.try_number += 1
+ session.commit()
+
+ # 1-st poke
+ date1 = timezone.utcnow()
+ time_machine.move_to(date1, tick=False)
+
+ with pytest.raises(RuntimeError):
+ self._run(sensor)
+
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 1
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.UP_FOR_RETRY
+
+ # On runtime error no reschedule attempt saved
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 0
+
+ # Scheduler does this before retry
+ sensor_ti = _get_sensor_ti()
+ sensor_ti.try_number += 1
+ session.commit()
+
+ # 2-nd poke
+ time_machine.coordinates.shift(sensor.retry_delay +
timedelta(seconds=1))
+ self._run(sensor)
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 2
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 1
+
+ # 3-rd poke
+ time_machine.coordinates.shift(sensor.poke_interval)
+ self._run(sensor)
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 2
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 2
+
+ # 4-th poke
+ time_machine.coordinates.shift(sensor.poke_interval)
+ self._run(sensor)
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 2
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 3
+
+ # 5-th poke causes timeout
+ time_machine.coordinates.shift(sensor.poke_interval)
+
+ with pytest.raises(AirflowSensorTimeout):
+ self._run(sensor)
+
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 2
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.FAILED
+
+ # On failed by timeout poke reschedule attempt is not saved
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 3
+
+ def test_reschedule_set_retries_2nd_poke_runtime_error_timeout_fail(
+ self, make_sensor, time_machine, session, task_reschedules_for_ti
+ ):
+ """
+ Mode "reschedule", retries set, fist poke causes RuntimeError, time
gone out.
+ RuntimeError on second iteration results in no reschedule for the
second attempt
+ recorded in DB.
+ In that case running timeout calculation should not be affected
because sensor
+ should use the first attempt as the start of execution.
+ Retries and timeout configurations interact correctly.
+
+ Given a sensor configured: poke_interval=5 timeout=10 retries=2
retry_delay=timedelta(seconds=7)
+ Number of retries incremented on error, but run timeout should not be
extended.
+ This is how it is expected to behave:
+ 00:00 Returns False: try_number=1 max_tries=2 state=UP_FOR_RESCHEDULE
+ 00:05 Raises RuntimeError: try_number=1 max_tries=2 state=UP_FOR_RETRY
+ 00:12 Raises AirflowSensorTimeout: try_number=2 max_tries=2
state=FAILED
+
+ """
+ sensor, dr = make_sensor(
+ return_value=None,
+ poke_interval=5,
+ timeout=10,
+ retries=2,
+ retry_delay=timedelta(seconds=7),
+ mode="reschedule",
+ silent_fail=False,
+ )
+
+ def _get_sensor_ti():
+ return next(x for x in dr.get_task_instances(session=session) if
x.task_id == SENSOR_OP)
+
+ sensor.poke = Mock(side_effect=[False, RuntimeError, False])
+
+ # Scheduler does this before the first run
+ sensor_ti = _get_sensor_ti()
+ sensor_ti.state = State.SCHEDULED
+ sensor_ti.try_number += 1
+ session.commit()
+
+ # 1-st poke
+ print("**1" * 40)
+ date1 = timezone.utcnow()
+ time_machine.move_to(date1, tick=False)
+ self._run(sensor)
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 1
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.UP_FOR_RESCHEDULE
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 1
+
+ # 2-nd poke
+ print("**2" * 40)
+ time_machine.coordinates.shift(sensor.poke_interval)
+
+ with pytest.raises(RuntimeError):
+ self._run(sensor)
+
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 1
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.UP_FOR_RETRY
+
+ # On runtime error no reschedule attempt saved
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 1
+
+ # Scheduler does this before retry
+ sensor_ti = _get_sensor_ti()
+ sensor_ti.try_number += 1
+ session.commit()
+
+ # 3-rd poke causes timeout
+ print("*3*" * 40)
+ time_machine.coordinates.shift(sensor.retry_delay +
timedelta(seconds=1))
+
+ with pytest.raises(AirflowSensorTimeout):
+ self._run(sensor)
+
+ sensor_ti = _get_sensor_ti()
+ assert sensor_ti.try_number == 2
+ assert sensor_ti.max_tries == 2
+ assert sensor_ti.state == State.FAILED
+
+ # On failed by timeout poke reschedule attempt is not saved
+ task_reschedules = task_reschedules_for_ti(sensor_ti)
+ assert len(task_reschedules) == 0
+
def test_reschedule_and_retry_timeout_and_silent_fail(self, make_sensor,
time_machine, session):
"""
Test mode="reschedule", silent_fail=True then retries and timeout
configurations interact correctly.