This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push: new 9d66c483a5d fix: rm `skip_if` and `run_if` in python source (#41832) (#45680) 9d66c483a5d is described below commit 9d66c483a5d012656b5a343e89d8bb538ea9644c Author: Josix <josixw...@gmail.com> AuthorDate: Thu Jan 16 02:17:47 2025 +0800 fix: rm `skip_if` and `run_if` in python source (#41832) (#45680) Co-authored-by: phi-friday <phi.fri...@gmail.com> --- airflow/utils/decorators.py | 2 +- tests/utils/test_decorators.py | 128 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 1 deletion(-) diff --git a/airflow/utils/decorators.py b/airflow/utils/decorators.py index 77a5eddaf08..e6981256ebb 100644 --- a/airflow/utils/decorators.py +++ b/airflow/utils/decorators.py @@ -81,7 +81,7 @@ def remove_task_decorator(python_source: str, task_decorator_name: str) -> str: after_decorator = after_decorator[1:] return before_decorator + after_decorator - decorators = ["@setup", "@teardown", task_decorator_name] + decorators = ["@setup", "@teardown", "@task.skip_if", "@task.run_if", task_decorator_name] for decorator in decorators: python_source = _remove_task_decorator(python_source, decorator) return python_source diff --git a/tests/utils/test_decorators.py b/tests/utils/test_decorators.py new file mode 100644 index 00000000000..19d3ec31d03 --- /dev/null +++ b/tests/utils/test_decorators.py @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from airflow.decorators import task + +if TYPE_CHECKING: + from airflow.decorators.base import Task, TaskDecorator + +_CONDITION_DECORATORS = frozenset({"skip_if", "run_if"}) +_NO_SOURCE_DECORATORS = frozenset({"sensor"}) +DECORATORS = sorted( + set(x for x in dir(task) if not x.startswith("_")) - _CONDITION_DECORATORS - _NO_SOURCE_DECORATORS +) +DECORATORS_USING_SOURCE = ("external_python", "virtualenv", "branch_virtualenv", "branch_external_python") + + +@pytest.fixture +def decorator(request: pytest.FixtureRequest) -> TaskDecorator: + decorator_factory = getattr(task, request.param) + + kwargs = {} + if "external" in request.param: + kwargs["python"] = "python3" + return decorator_factory(**kwargs) + + +@pytest.mark.parametrize("decorator", DECORATORS_USING_SOURCE, indirect=["decorator"]) +def test_task_decorator_using_source(decorator: TaskDecorator): + @decorator + def f(): + return ["some_task"] + + assert parse_python_source(f, "decorator") == 'def f():\n return ["some_task"]\n' + + +@pytest.mark.parametrize("decorator", DECORATORS, indirect=["decorator"]) +def test_skip_if(decorator: TaskDecorator): + @task.skip_if(lambda context: True) + @decorator + def f(): + return "hello world" + + assert parse_python_source(f, "decorator") == 'def f():\n return "hello world"\n' + + +@pytest.mark.parametrize("decorator", DECORATORS, indirect=["decorator"]) +def test_run_if(decorator: TaskDecorator): + @task.run_if(lambda context: True) + @decorator + def f(): + return "hello world" + + assert parse_python_source(f, "decorator") == 'def f():\n return "hello world"\n' + + +def test_skip_if_and_run_if(): + @task.skip_if(lambda context: True) + @task.run_if(lambda context: True) + @task.virtualenv() + def f(): + return "hello world" + + assert parse_python_source(f) == 'def f():\n return "hello world"\n' + + +def test_run_if_and_skip_if(): + @task.run_if(lambda context: True) + @task.skip_if(lambda context: True) + @task.virtualenv() + def f(): + return "hello world" + + assert parse_python_source(f) == 'def f():\n return "hello world"\n' + + +def test_skip_if_allow_decorator(): + def non_task_decorator(func): + return func + + @task.skip_if(lambda context: True) + @task.virtualenv() + @non_task_decorator + def f(): + return "hello world" + + assert parse_python_source(f) == '@non_task_decorator\ndef f():\n return "hello world"\n' + + +def test_run_if_allow_decorator(): + def non_task_decorator(func): + return func + + @task.run_if(lambda context: True) + @task.virtualenv() + @non_task_decorator + def f(): + return "hello world" + + assert parse_python_source(f) == '@non_task_decorator\ndef f():\n return "hello world"\n' + + +def parse_python_source(task: Task, custom_operator_name: str | None = None) -> str: + operator = task().operator + if custom_operator_name: + custom_operator_name = ( + custom_operator_name if custom_operator_name.startswith("@") else f"@{custom_operator_name}" + ) + operator.__dict__["custom_operator_name"] = custom_operator_name + return operator.get_python_source()