This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2d2eedd0defae5c04f0c2df06f267d255ba27b70
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Jul 4 23:19:13 2025 +0530

    Fix task configuration defaults for AbstractOperator (#52871)
    
    Some defaults weren't being taken from configuration -- this is now fixed.
    
    (cherry picked from commit 04d2d3b8b8ce831d8725ca661a61f1e310d27eff)
---
 airflow-core/src/airflow/models/__init__.py        | 21 +++++++++++++
 .../src/airflow/models/abstractoperator.py         | 34 ----------------------
 .../unit/serialization/test_dag_serialization.py   |  2 ++
 .../providers/edge3/executors/edge_executor.py     |  6 +++-
 .../sdk/definitions/_internal/abstractoperator.py  | 20 ++++++++-----
 .../sdk/definitions/decorators/__init__.pyi        |  4 +--
 .../src/airflow/sdk/definitions/mappedoperator.py  | 10 ++-----
 7 files changed, 46 insertions(+), 51 deletions(-)

diff --git a/airflow-core/src/airflow/models/__init__.py 
b/airflow-core/src/airflow/models/__init__.py
index 9274ae7a79f..b5a8e61e49a 100644
--- a/airflow-core/src/airflow/models/__init__.py
+++ b/airflow-core/src/airflow/models/__init__.py
@@ -19,6 +19,8 @@
 
 from __future__ import annotations
 
+from airflow.utils.deprecation_tools import add_deprecated_classes
+
 # Do not add new models to this -- this is for compat only
 __all__ = [
     "DAG",
@@ -141,3 +143,22 @@ if TYPE_CHECKING:
     from airflow.sdk.bases.xcom import BaseXCom
     from airflow.sdk.definitions.param import Param
     from airflow.sdk.execution_time.xcom import XCom
+
+__deprecated_classes = {
+    "abstractoperator": {
+        "AbstractOperator": 
"airflow.sdk.definitions._internal.abstractoperator.AbstractOperator",
+        "NotMapped": 
"airflow.sdk.definitions._internal.abstractoperator.NotMapped",
+        "TaskStateChangeCallback": 
"airflow.sdk.definitions._internal.abstractoperator.TaskStateChangeCallback",
+        "DEFAULT_OWNER": 
"airflow.sdk.definitions._internal.abstractoperator.DEFAULT_OWNER",
+        "DEFAULT_QUEUE": 
"airflow.sdk.definitions._internal.abstractoperator.DEFAULT_QUEUE",
+        "DEFAULT_TASK_EXECUTION_TIMEOUT": 
"airflow.sdk.definitions._internal.abstractoperator.DEFAULT_TASK_EXECUTION_TIMEOUT",
+    },
+    "param": {
+        "Param": "airflow.sdk.definitions.param.Param",
+        "ParamsDict": "airflow.sdk.definitions.param.ParamsDict",
+    },
+    "baseoperatorlink": {
+        "BaseOperatorLink": "airflow.sdk.bases.operatorlink.BaseOperatorLink",
+    },
+}
+add_deprecated_classes(__deprecated_classes, __name__)
diff --git a/airflow-core/src/airflow/models/abstractoperator.py 
b/airflow-core/src/airflow/models/abstractoperator.py
deleted file mode 100644
index e5b5f7dc81f..00000000000
--- a/airflow-core/src/airflow/models/abstractoperator.py
+++ /dev/null
@@ -1,34 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import datetime
-
-from airflow.configuration import conf
-from airflow.sdk.definitions._internal.abstractoperator import (
-    AbstractOperator as AbstractOperator,
-    NotMapped as NotMapped,  # Re-export this for compat
-    TaskStateChangeCallback as TaskStateChangeCallback,
-)
-
-DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner")
-DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue")
-
-DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = conf.gettimedelta(
-    "core", "default_task_execution_timeout"
-)
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index a7508b3ea52..f4a719496f2 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -187,6 +187,7 @@ serialized_simple_dag_ground_truth = {
                     "bash_command": "echo {{ task.task_id }}",
                     "task_type": "BashOperator",
                     "_task_module": 
"airflow.providers.standard.operators.bash",
+                    "owner": "airflow",
                     "pool": "default_pool",
                     "is_setup": False,
                     "is_teardown": False,
@@ -3163,6 +3164,7 @@ def test_handle_v1_serdag():
                         "_task_type": "BashOperator",
                         # Slightly difference from v2-10-stable here, we 
manually changed this path
                         "_task_module": 
"airflow.providers.standard.operators.bash",
+                        "owner": "airflow",
                         "pool": "default_pool",
                         "is_setup": False,
                         "is_teardown": False,
diff --git 
a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py 
b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
index 55d5414cd89..20bf71623d1 100644
--- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
+++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
@@ -30,7 +30,11 @@ from sqlalchemy.orm import Session
 from airflow.cli.cli_config import GroupCommand
 from airflow.configuration import conf
 from airflow.executors.base_executor import BaseExecutor
-from airflow.models.abstractoperator import DEFAULT_QUEUE
+
+try:
+    from airflow.models.abstractoperator import DEFAULT_QUEUE
+except (ImportError, AttributeError):
+    from airflow.sdk.definitions._internal.abstractoperator import 
DEFAULT_QUEUE
 from airflow.models.taskinstance import TaskInstance, TaskInstanceState
 from airflow.providers.edge3.cli.edge_command import EDGE_COMMANDS
 from airflow.providers.edge3.models.edge_job import EdgeJobModel
diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py 
b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
index 95c8d9d8287..8934cd0e453 100644
--- a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
@@ -51,7 +51,7 @@ if TYPE_CHECKING:
 
 TaskStateChangeCallback = Callable[[Context], None]
 
-DEFAULT_OWNER: str = "airflow"
+DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner")
 DEFAULT_POOL_SLOTS: int = 1
 DEFAULT_POOL_NAME = "default_pool"
 DEFAULT_PRIORITY_WEIGHT: int = 1
@@ -62,17 +62,23 @@ DEFAULT_PRIORITY_WEIGHT: int = 1
 MINIMUM_PRIORITY_WEIGHT: int = -2147483648
 MAXIMUM_PRIORITY_WEIGHT: int = 2147483647
 DEFAULT_EXECUTOR: str | None = None
-DEFAULT_QUEUE: str = conf.get("operators", "default_queue", "default")
+DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue")
 DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
 DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
-DEFAULT_RETRIES: int = 0
-DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
-MAX_RETRY_DELAY: int = 24 * 60 * 60
+DEFAULT_RETRIES: int = conf.getint("core", "default_task_retries", fallback=0)
+DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(
+    seconds=conf.getint("core", "default_task_retry_delay", fallback=300)
+)
+MAX_RETRY_DELAY: int = conf.getint("core", "max_task_retry_delay", fallback=24 
* 60 * 60)
 
 # TODO: Task-SDK -- these defaults should be overridable from the Airflow 
config
 DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
-DEFAULT_WEIGHT_RULE: WeightRule = WeightRule.DOWNSTREAM
-DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None
+DEFAULT_WEIGHT_RULE: WeightRule = WeightRule(
+    conf.get("core", "default_task_weight_rule", 
fallback=WeightRule.DOWNSTREAM)
+)
+DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = conf.gettimedelta(
+    "core", "default_task_execution_timeout"
+)
 
 log = logging.getLogger(__name__)
 
diff --git a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi 
b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi
index 30e921f2f48..038f94a0bd8 100644
--- a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi
@@ -496,7 +496,7 @@ class TaskDecoratorCollection:
         """
         # [END decorator_signature]
     @overload
-    def kubernetes(
+    def kubernetes(  # type: ignore[misc]
         self,
         *,
         multiple_outputs: bool | None = None,
@@ -670,7 +670,7 @@ class TaskDecoratorCollection:
     @overload
     def kubernetes(self, python_callable: Callable[FParams, FReturn]) -> 
Task[FParams, FReturn]: ...
     @overload
-    def kubernetes_cmd(
+    def kubernetes_cmd(  # type: ignore[misc]
         self,
         *,
         args_only: bool = False,  # Added by _KubernetesCmdDecoratedOperator.
diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py 
b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
index cb24a7cc6bd..8c84ec6bbee 100644
--- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -41,6 +41,7 @@ from airflow.sdk.definitions._internal.abstractoperator 
import (
     DEFAULT_WEIGHT_RULE,
     AbstractOperator,
     NotMapped,
+    TaskStateChangeCallback,
 )
 from airflow.sdk.definitions._internal.expandinput import (
     DictOfListsExpandInput,
@@ -50,7 +51,7 @@ from airflow.sdk.definitions._internal.expandinput import (
 from airflow.sdk.definitions._internal.types import NOTSET
 from airflow.serialization.enums import DagAttributeTypes
 from airflow.task.priority_strategy import PriorityWeightStrategy, 
validate_and_load_priority_weight_strategy
-from airflow.typing_compat import Literal
+from airflow.typing_compat import Literal, TypeGuard
 from airflow.utils.helpers import is_container, prevent_duplicates
 from airflow.utils.xcom import XCOM_RETURN_KEY
 
@@ -60,9 +61,6 @@ if TYPE_CHECKING:
     import jinja2  # Slow import.
     import pendulum
 
-    from airflow.models.abstractoperator import (
-        TaskStateChangeCallback,
-    )
     from airflow.models.expandinput import (
         OperatorExpandArgument,
         OperatorExpandKwargsArgument,
@@ -76,14 +74,12 @@ if TYPE_CHECKING:
     from airflow.sdk.types import Operator
     from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
     from airflow.triggers.base import StartTriggerArgs
-    from airflow.typing_compat import TypeGuard
     from airflow.utils.context import Context
     from airflow.utils.operator_resources import Resources
     from airflow.utils.task_group import TaskGroup
     from airflow.utils.trigger_rule import TriggerRule
 
-    TaskStateChangeCallbackAttrType = Union[None, TaskStateChangeCallback, 
list[TaskStateChangeCallback]]
-
+TaskStateChangeCallbackAttrType = Union[None, TaskStateChangeCallback, 
list[TaskStateChangeCallback]]
 ValidationSource = Union[Literal["expand"], Literal["partial"]]
 
 

Reply via email to