This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2d2eedd0defae5c04f0c2df06f267d255ba27b70 Author: Kaxil Naik <[email protected]> AuthorDate: Fri Jul 4 23:19:13 2025 +0530 Fix task configuration defaults for AbstractOperator (#52871) Some defaults weren't being taken from configuration -- this is now fixed. (cherry picked from commit 04d2d3b8b8ce831d8725ca661a61f1e310d27eff) --- airflow-core/src/airflow/models/__init__.py | 21 +++++++++++++ .../src/airflow/models/abstractoperator.py | 34 ---------------------- .../unit/serialization/test_dag_serialization.py | 2 ++ .../providers/edge3/executors/edge_executor.py | 6 +++- .../sdk/definitions/_internal/abstractoperator.py | 20 ++++++++----- .../sdk/definitions/decorators/__init__.pyi | 4 +-- .../src/airflow/sdk/definitions/mappedoperator.py | 10 ++----- 7 files changed, 46 insertions(+), 51 deletions(-) diff --git a/airflow-core/src/airflow/models/__init__.py b/airflow-core/src/airflow/models/__init__.py index 9274ae7a79f..b5a8e61e49a 100644 --- a/airflow-core/src/airflow/models/__init__.py +++ b/airflow-core/src/airflow/models/__init__.py @@ -19,6 +19,8 @@ from __future__ import annotations +from airflow.utils.deprecation_tools import add_deprecated_classes + # Do not add new models to this -- this is for compat only __all__ = [ "DAG", @@ -141,3 +143,22 @@ if TYPE_CHECKING: from airflow.sdk.bases.xcom import BaseXCom from airflow.sdk.definitions.param import Param from airflow.sdk.execution_time.xcom import XCom + +__deprecated_classes = { + "abstractoperator": { + "AbstractOperator": "airflow.sdk.definitions._internal.abstractoperator.AbstractOperator", + "NotMapped": "airflow.sdk.definitions._internal.abstractoperator.NotMapped", + "TaskStateChangeCallback": "airflow.sdk.definitions._internal.abstractoperator.TaskStateChangeCallback", + "DEFAULT_OWNER": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_OWNER", + "DEFAULT_QUEUE": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_QUEUE", + "DEFAULT_TASK_EXECUTION_TIMEOUT": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_TASK_EXECUTION_TIMEOUT", + }, + "param": { + "Param": "airflow.sdk.definitions.param.Param", + "ParamsDict": "airflow.sdk.definitions.param.ParamsDict", + }, + "baseoperatorlink": { + "BaseOperatorLink": "airflow.sdk.bases.operatorlink.BaseOperatorLink", + }, +} +add_deprecated_classes(__deprecated_classes, __name__) diff --git a/airflow-core/src/airflow/models/abstractoperator.py b/airflow-core/src/airflow/models/abstractoperator.py deleted file mode 100644 index e5b5f7dc81f..00000000000 --- a/airflow-core/src/airflow/models/abstractoperator.py +++ /dev/null @@ -1,34 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import datetime - -from airflow.configuration import conf -from airflow.sdk.definitions._internal.abstractoperator import ( - AbstractOperator as AbstractOperator, - NotMapped as NotMapped, # Re-export this for compat - TaskStateChangeCallback as TaskStateChangeCallback, -) - -DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner") -DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue") - -DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = conf.gettimedelta( - "core", "default_task_execution_timeout" -) diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index a7508b3ea52..f4a719496f2 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -187,6 +187,7 @@ serialized_simple_dag_ground_truth = { "bash_command": "echo {{ task.task_id }}", "task_type": "BashOperator", "_task_module": "airflow.providers.standard.operators.bash", + "owner": "airflow", "pool": "default_pool", "is_setup": False, "is_teardown": False, @@ -3163,6 +3164,7 @@ def test_handle_v1_serdag(): "_task_type": "BashOperator", # Slightly difference from v2-10-stable here, we manually changed this path "_task_module": "airflow.providers.standard.operators.bash", + "owner": "airflow", "pool": "default_pool", "is_setup": False, "is_teardown": False, diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index 55d5414cd89..20bf71623d1 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -30,7 +30,11 @@ from sqlalchemy.orm import Session from airflow.cli.cli_config import GroupCommand from airflow.configuration import conf from airflow.executors.base_executor import BaseExecutor -from airflow.models.abstractoperator import DEFAULT_QUEUE + +try: + from airflow.models.abstractoperator import DEFAULT_QUEUE +except (ImportError, AttributeError): + from airflow.sdk.definitions._internal.abstractoperator import DEFAULT_QUEUE from airflow.models.taskinstance import TaskInstance, TaskInstanceState from airflow.providers.edge3.cli.edge_command import EDGE_COMMANDS from airflow.providers.edge3.models.edge_job import EdgeJobModel diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py index 95c8d9d8287..8934cd0e453 100644 --- a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py @@ -51,7 +51,7 @@ if TYPE_CHECKING: TaskStateChangeCallback = Callable[[Context], None] -DEFAULT_OWNER: str = "airflow" +DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner") DEFAULT_POOL_SLOTS: int = 1 DEFAULT_POOL_NAME = "default_pool" DEFAULT_PRIORITY_WEIGHT: int = 1 @@ -62,17 +62,23 @@ DEFAULT_PRIORITY_WEIGHT: int = 1 MINIMUM_PRIORITY_WEIGHT: int = -2147483648 MAXIMUM_PRIORITY_WEIGHT: int = 2147483647 DEFAULT_EXECUTOR: str | None = None -DEFAULT_QUEUE: str = conf.get("operators", "default_queue", "default") +DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue") DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False -DEFAULT_RETRIES: int = 0 -DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300) -MAX_RETRY_DELAY: int = 24 * 60 * 60 +DEFAULT_RETRIES: int = conf.getint("core", "default_task_retries", fallback=0) +DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta( + seconds=conf.getint("core", "default_task_retry_delay", fallback=300) +) +MAX_RETRY_DELAY: int = conf.getint("core", "max_task_retry_delay", fallback=24 * 60 * 60) # TODO: Task-SDK -- these defaults should be overridable from the Airflow config DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS -DEFAULT_WEIGHT_RULE: WeightRule = WeightRule.DOWNSTREAM -DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None +DEFAULT_WEIGHT_RULE: WeightRule = WeightRule( + conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM) +) +DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = conf.gettimedelta( + "core", "default_task_execution_timeout" +) log = logging.getLogger(__name__) diff --git a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi index 30e921f2f48..038f94a0bd8 100644 --- a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi +++ b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi @@ -496,7 +496,7 @@ class TaskDecoratorCollection: """ # [END decorator_signature] @overload - def kubernetes( + def kubernetes( # type: ignore[misc] self, *, multiple_outputs: bool | None = None, @@ -670,7 +670,7 @@ class TaskDecoratorCollection: @overload def kubernetes(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... @overload - def kubernetes_cmd( + def kubernetes_cmd( # type: ignore[misc] self, *, args_only: bool = False, # Added by _KubernetesCmdDecoratedOperator. diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py index cb24a7cc6bd..8c84ec6bbee 100644 --- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -41,6 +41,7 @@ from airflow.sdk.definitions._internal.abstractoperator import ( DEFAULT_WEIGHT_RULE, AbstractOperator, NotMapped, + TaskStateChangeCallback, ) from airflow.sdk.definitions._internal.expandinput import ( DictOfListsExpandInput, @@ -50,7 +51,7 @@ from airflow.sdk.definitions._internal.expandinput import ( from airflow.sdk.definitions._internal.types import NOTSET from airflow.serialization.enums import DagAttributeTypes from airflow.task.priority_strategy import PriorityWeightStrategy, validate_and_load_priority_weight_strategy -from airflow.typing_compat import Literal +from airflow.typing_compat import Literal, TypeGuard from airflow.utils.helpers import is_container, prevent_duplicates from airflow.utils.xcom import XCOM_RETURN_KEY @@ -60,9 +61,6 @@ if TYPE_CHECKING: import jinja2 # Slow import. import pendulum - from airflow.models.abstractoperator import ( - TaskStateChangeCallback, - ) from airflow.models.expandinput import ( OperatorExpandArgument, OperatorExpandKwargsArgument, @@ -76,14 +74,12 @@ if TYPE_CHECKING: from airflow.sdk.types import Operator from airflow.ti_deps.deps.base_ti_dep import BaseTIDep from airflow.triggers.base import StartTriggerArgs - from airflow.typing_compat import TypeGuard from airflow.utils.context import Context from airflow.utils.operator_resources import Resources from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule - TaskStateChangeCallbackAttrType = Union[None, TaskStateChangeCallback, list[TaskStateChangeCallback]] - +TaskStateChangeCallbackAttrType = Union[None, TaskStateChangeCallback, list[TaskStateChangeCallback]] ValidationSource = Union[Literal["expand"], Literal["partial"]]
