This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4678cf54b5cd651e0232a42746f9be80db43a609 Author: Kaxil Naik <kaxiln...@gmail.com> AuthorDate: Fri Jul 2 01:52:50 2021 +0100 Validate type of `priority_weight` during parsing (#16765) closes https://github.com/apache/airflow/issues/16762 Without this the scheduler crashes as validation does not happen at DAG Parsing time. (cherry picked from commit 9d170279a60d9d4ed513bae1c35999926f042662) --- airflow/models/baseoperator.py | 7 ++++++- tests/models/test_baseoperator.py | 5 +++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 10e8bfd..1fec8cf 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -586,10 +586,15 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta if isinstance(max_retry_delay, timedelta): self.max_retry_delay = max_retry_delay else: - self.log.debug("Max_retry_delay isn't timedelta object, assuming secs") + self.log.debug("max_retry_delay isn't a timedelta object, assuming secs") self.max_retry_delay = timedelta(seconds=max_retry_delay) self.params = params or {} # Available in templates! + if priority_weight is not None and not isinstance(priority_weight, int): + raise AirflowException( + f"`priority_weight` for task '{self.task_id}' only accepts integers, " + f"received '{type(priority_weight)}'." + ) self.priority_weight = priority_weight if not WeightRule.is_valid(weight_rule): raise AirflowException( diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py index fa02b4e..04d3f54 100644 --- a/tests/models/test_baseoperator.py +++ b/tests/models/test_baseoperator.py @@ -109,6 +109,11 @@ class TestBaseOperator(unittest.TestCase): with pytest.raises(AirflowException, match='Argument.*test_param.*required'): DummyClass(default_args=default_args) + def test_incorrect_priority_weight(self): + error_msg = "`priority_weight` for task 'test_op' only accepts integers, received '<class 'str'>'." + with pytest.raises(AirflowException, match=error_msg): + DummyOperator(task_id="test_op", priority_weight="2") + @parameterized.expand( [ ("{{ foo }}", {"foo": "bar"}, "bar"),