This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 041c9d27e373d8e778ce8f2b08f86b76a845d064 Author: Paul Vickers <[email protected]> AuthorDate: Fri Feb 26 11:42:00 2021 +0000 BugFix: Serialize max_retry_delay as a timedelta (#14436) closes: #13086, #14212 (cherry picked from commit 59c459fa2a6aafc133db4a89980fb3d3d0d25589) --- airflow/models/baseoperator.py | 9 ++++++++- airflow/serialization/schema.json | 1 + airflow/serialization/serialized_objects.py | 2 +- tests/serialization/test_dag_serialization.py | 4 ++++ 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 64ed4c5..06094a1 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -353,7 +353,7 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta retries: Optional[int] = conf.getint('core', 'default_task_retries', fallback=0), retry_delay: timedelta = timedelta(seconds=300), retry_exponential_backoff: bool = False, - max_retry_delay: Optional[datetime] = None, + max_retry_delay: Optional[timedelta] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, depends_on_past: bool = False, @@ -460,6 +460,13 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta self.retry_delay = timedelta(seconds=retry_delay) # noqa self.retry_exponential_backoff = retry_exponential_backoff self.max_retry_delay = max_retry_delay + if max_retry_delay: + if isinstance(max_retry_delay, timedelta): + self.max_retry_delay = max_retry_delay + else: + self.log.debug("Max_retry_delay isn't timedelta object, assuming secs") + self.max_retry_delay = timedelta(seconds=max_retry_delay) # noqa + self.params = params or {} # Available in templates! self.priority_weight = priority_weight if not WeightRule.is_valid(weight_rule): diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index c831334..0fbe20f 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -145,6 +145,7 @@ "execution_timeout": { "$ref": "#/definitions/timedelta" }, "retry_delay": { "$ref": "#/definitions/timedelta" }, "retry_exponential_backoff": { "type": "boolean" }, + "max_retry_delay": { "$ref": "#/definitions/timedelta" }, "params": { "$ref": "#/definitions/dict" }, "priority_weight": { "type": "number" }, "weight_rule": { "type": "string" }, diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index a5e9f15..d609c09 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -452,7 +452,7 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization): v = set(v) elif k == "subdag": v = SerializedDAG.deserialize_dag(v) - elif k in {"retry_delay", "execution_timeout", "sla"}: + elif k in {"retry_delay", "execution_timeout", "sla", "max_retry_delay"}: v = cls._deserialize_timedelta(v) elif k in encoded_op["template_fields"]: pass diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 2046e22..a775f5b 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -60,6 +60,7 @@ serialized_simple_dag_ground_truth = { "depends_on_past": False, "retries": 1, "retry_delay": {"__type": "timedelta", "__var": 300.0}, + "max_retry_delay": {"__type": "timedelta", "__var": 600.0}, "sla": {"__type": "timedelta", "__var": 100.0}, }, }, @@ -85,6 +86,7 @@ serialized_simple_dag_ground_truth = { "owner": "airflow", "retries": 1, "retry_delay": 300.0, + "max_retry_delay": 600.0, "sla": 100.0, "_downstream_task_ids": [], "_inlets": [], @@ -113,6 +115,7 @@ serialized_simple_dag_ground_truth = { "task_id": "custom_task", "retries": 1, "retry_delay": 300.0, + "max_retry_delay": 600.0, "sla": 100.0, "_downstream_task_ids": [], "_inlets": [], @@ -160,6 +163,7 @@ def make_simple_dag(): default_args={ "retries": 1, "retry_delay": timedelta(minutes=5), + "max_retry_delay": timedelta(minutes=10), "depends_on_past": False, "sla": timedelta(seconds=100), },
