This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c3bc645c1322fff970766633ed906891c213bfcc Author: Tzu-ping Chung <[email protected]> AuthorDate: Sun Jun 13 08:29:14 2021 +0800 Validate retries value on init for better errors (#16415) (cherry picked from commit 15ff2388e8a52348afcc923653f85ce15a3c5f71) --- airflow/models/baseoperator.py | 8 ++++++++ tests/core/test_core.py | 46 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index e243b5e..f74c5f9 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -563,6 +563,14 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta if wait_for_downstream: self.depends_on_past = True + if retries is not None and not isinstance(retries, int): + try: + parsed_retries = int(retries) + except (TypeError, ValueError): + raise AirflowException(f"'retries' type must be int, not {type(retries).__name__}") + self.log.warning("Implicitly converting 'retries' for %s from %r to int", self, retries) + retries = parsed_retries + self.retries = retries self.queue = queue self.pool = Pool.DEFAULT_POOL_NAME if pool is None else pool diff --git a/tests/core/test_core.py b/tests/core/test_core.py index 7149112..78f2676 100644 --- a/tests/core/test_core.py +++ b/tests/core/test_core.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. +import logging import multiprocessing import os import signal @@ -453,3 +454,48 @@ class TestCore(unittest.TestCase): assert context1['params'] == {'key_1': 'value_1', 'key_2': 'value_2_new', 'key_3': 'value_3'} assert context2['params'] == {'key_1': 'value_1', 'key_2': 'value_2_old'} + + [email protected]() +def dag(): + return DAG(TEST_DAG_ID, default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE}) + + +def test_operator_retries_invalid(dag): + with pytest.raises(AirflowException) as ctx: + BashOperator( + task_id='test_illegal_args', + bash_command='echo success', + dag=dag, + retries='foo', + ) + assert str(ctx.value) == "'retries' type must be int, not str" + + +def test_operator_retries_coerce(caplog, dag): + with caplog.at_level(logging.WARNING): + BashOperator( + task_id='test_illegal_args', + bash_command='echo success', + dag=dag, + retries='1', + ) + assert caplog.record_tuples == [ + ( + "airflow.operators.bash.BashOperator", + logging.WARNING, + "Implicitly converting 'retries' for <Task(BashOperator): test_illegal_args> from '1' to int", + ), + ] + + [email protected]("retries", [None, 5]) +def test_operator_retries(caplog, dag, retries): + with caplog.at_level(logging.WARNING): + BashOperator( + task_id='test_illegal_args', + bash_command='echo success', + dag=dag, + retries=retries, + ) + assert caplog.records == []
