This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0ddc90f12020cd7197189a80ea56a7ec0a81273b
Author: David Caron <dcaro...@gmail.com>
AuthorDate: Tue May 17 08:18:29 2022 -0400

    Add 'reschedule' to the serialized fields for the BaseSensorOperator 
(#23674)
    
    fix #23411
    
    (cherry picked from commit f9e2a3051cd3a5b6fcf33bca4c929d220cf5661e)
---
 airflow/sensors/base.py                            | 4 ++++
 tests/serialization/test_dag_serialization.py      | 1 +
 tests/ti_deps/deps/test_ready_to_reschedule_dep.py | 7 ++++++-
 3 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py
index 7f1cd87c3d..f00b3a6761 100644
--- a/airflow/sensors/base.py
+++ b/airflow/sensors/base.py
@@ -339,6 +339,10 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
         """Define mode rescheduled sensors."""
         return self.mode == 'reschedule'
 
+    @classmethod
+    def get_serialized_fields(cls):
+        return super().get_serialized_fields() | {"reschedule"}
+
 
 def poke_mode_only(cls):
     """
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index 0144501f1a..fe9fc7c7e5 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -1462,6 +1462,7 @@ class TestStringifiedDAGs:
         assert "deps" in blob
 
         serialized_op = SerializedBaseOperator.deserialize_operator(blob)
+        assert serialized_op.reschedule == (mode == "reschedule")
         assert op.deps == serialized_op.deps
 
     @pytest.mark.parametrize(
diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py 
b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
index 470166db21..99416bbbc8 100644
--- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
+++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
@@ -31,7 +31,7 @@ from airflow.utils.timezone import utcnow
 class TestNotInReschedulePeriodDep(unittest.TestCase):
     def _get_task_instance(self, state):
         dag = DAG('test_dag')
-        task = Mock(dag=dag)
+        task = Mock(dag=dag, reschedule=True)
         ti = TaskInstance(task=task, state=state, run_id=None)
         return ti
 
@@ -52,6 +52,11 @@ class TestNotInReschedulePeriodDep(unittest.TestCase):
         dep_context = DepContext(ignore_in_reschedule_period=True)
         assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)
 
+    def test_should_pass_if_not_reschedule_mode(self):
+        ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
+        del ti.task.reschedule
+        assert ReadyToRescheduleDep().is_met(ti=ti)
+
     def test_should_pass_if_not_in_none_state(self):
         ti = self._get_task_instance(State.UP_FOR_RETRY)
         assert ReadyToRescheduleDep().is_met(ti=ti)

Reply via email to