jscheffl commented on code in PR #43076:
URL: https://github.com/apache/airflow/pull/43076#discussion_r1803630571


##########
task_sdk/src/airflow/sdk/definitions/abstractoperator.py:
##########
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from abc import abstractmethod
+from collections.abc import (
+    Collection,
+    Iterable,
+)
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    ClassVar,
+)
+
+from airflow.sdk.definitions.node import DAGNode
+from airflow.utils.log.secrets_masker import redact
+
+# TaskStateChangeCallback = Callable[[Context], None]
+
+if TYPE_CHECKING:
+    import jinja2  # Slow import.
+
+    from airflow.models.baseoperatorlink import BaseOperatorLink
+    from airflow.sdk.definitions.baseoperator import BaseOperator
+    from airflow.sdk.definitions.dag import DAG
+    from airflow.task.priority_strategy import PriorityWeightStrategy
+
+    # TODO: Task-SDK
+    Context = dict[str, Any]
+
+
+DEFAULT_OWNER: str = "airflow"
+DEFAULT_POOL_SLOTS: int = 1
+DEFAULT_PRIORITY_WEIGHT: int = 1
+DEFAULT_EXECUTOR: str | None = None
+DEFAULT_QUEUE: str = "default"
+DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
+DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
+DEFAULT_RETRIES: int = 0
+DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
+MAX_RETRY_DELAY: int = 24 * 60 * 60
+
+# TODO: Task-SDK
+# DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
+DEFAULT_TRIGGER_RULE = "all_success"
+DEFAULT_WEIGHT_RULE = "downstream"
+DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None
+
+
+class NotMapped(Exception):
+    """Raise if a task is neither mapped nor has any parent mapped groups."""

Review Comment:
   Should we consolidate all exceptions into one module like we have it today 
or keep the exceptions per module?
   Any base class for the SDK like we have today `AirflowException`?



##########
airflow/models/dag.py:
##########
@@ -1076,17 +768,9 @@ def access_control(self):
     def access_control(self, value):
         self._access_control = DAG._upgrade_outdated_dag_access_control(value)
 
-    @property
-    def dag_display_name(self) -> str:
-        return self._dag_display_property_value or self._dag_id
-
-    @property
-    def description(self) -> str | None:
-        return self._description
-
     @property
     def default_view(self) -> str:
-        return self._default_view
+        return "grid"

Review Comment:
   Ups, why this? Is this just to keep legacy?



##########
task_sdk/src/airflow/sdk/definitions/abstractoperator.py:
##########
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from abc import abstractmethod
+from collections.abc import (
+    Collection,
+    Iterable,
+)
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    ClassVar,
+)
+
+from airflow.sdk.definitions.node import DAGNode
+from airflow.utils.log.secrets_masker import redact
+
+# TaskStateChangeCallback = Callable[[Context], None]
+
+if TYPE_CHECKING:
+    import jinja2  # Slow import.
+
+    from airflow.models.baseoperatorlink import BaseOperatorLink
+    from airflow.sdk.definitions.baseoperator import BaseOperator
+    from airflow.sdk.definitions.dag import DAG
+    from airflow.task.priority_strategy import PriorityWeightStrategy
+
+    # TODO: Task-SDK
+    Context = dict[str, Any]
+
+
+DEFAULT_OWNER: str = "airflow"
+DEFAULT_POOL_SLOTS: int = 1
+DEFAULT_PRIORITY_WEIGHT: int = 1
+DEFAULT_EXECUTOR: str | None = None
+DEFAULT_QUEUE: str = "default"
+DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
+DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
+DEFAULT_RETRIES: int = 0
+DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
+MAX_RETRY_DELAY: int = 24 * 60 * 60
+
+# TODO: Task-SDK
+# DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
+DEFAULT_TRIGGER_RULE = "all_success"
+DEFAULT_WEIGHT_RULE = "downstream"
+DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None
+
+
+class NotMapped(Exception):
+    """Raise if a task is neither mapped nor has any parent mapped groups."""
+
+
+class AbstractOperator(DAGNode):
+    """
+    Common implementation for operators, including unmapped and mapped.
+
+    This base class is more about sharing implementations, not defining a 
common
+    interface. Unfortunately it's difficult to use this as the common base 
class
+    for typing due to BaseOperator carrying too much historical baggage.
+
+    The union type ``from airflow.models.operator import Operator`` is easier
+    to use for typing purposes.
+
+    :meta private:
+    """
+
+    operator_class: type[BaseOperator] | dict[str, Any]
+
+    weight_rule: PriorityWeightStrategy
+    priority_weight: int
+
+    # Defines the operator level extra links.
+    operator_extra_links: Collection[BaseOperatorLink]
+
+    owner: str
+    task_id: str
+
+    outlets: list
+    inlets: list
+    # TODO:
+    # trigger_rule: TriggerRule
+    _needs_expansion: bool | None = None
+    _on_failure_fail_dagrun = False
+
+    HIDE_ATTRS_FROM_UI: ClassVar[frozenset[str]] = frozenset(
+        (
+            "log",
+            "dag",  # We show dag_id, don't need to show this too
+            "node_id",  # Duplicates task_id
+            "task_group",  # Doesn't have a useful repr, no point showing in UI
+            "inherits_from_empty_operator",  # impl detail
+            # Decide whether to start task execution from triggerer
+            "start_trigger_args",
+            "start_from_trigger",
+            # For compatibility with TG, for operators these are just the 
current task, no point showing
+            "roots",
+            "leaves",
+            # These lists are already shown via *_task_ids
+            "upstream_list",
+            "downstream_list",
+            # Not useful, implementation detail, already shown elsewhere
+            "global_operator_extra_link_dict",
+            "operator_extra_link_dict",
+        )
+    )
+
+    def get_dag(self) -> DAG | None:
+        raise NotImplementedError()
+
+    @property
+    def task_type(self) -> str:
+        raise NotImplementedError()
+
+    @property
+    def operator_name(self) -> str:
+        raise NotImplementedError()
+
+    @property
+    def inherits_from_empty_operator(self) -> bool:

Review Comment:
   If I see this I feel there is a lot of stuff that would be cool also to 
clean-up... but I assume this would be too many breaking changes?



##########
uv.lock:
##########


Review Comment:
   Is the lock file checked-in as a feature? or is this a bug? (Maybe Í am 
lagging a bit of UV understanding)



##########
airflow/task/priority_strategy.py:
##########
@@ -150,5 +148,5 @@ def validate_and_load_priority_weight_strategy(
         priority_weight_strategy_class = qualname(priority_weight_strategy)
     loaded_priority_weight_strategy = 
_get_registered_priority_weight_strategy(priority_weight_strategy_class)
     if loaded_priority_weight_strategy is None:
-        raise AirflowException(f"Unknown priority strategy 
{priority_weight_strategy_class}")
+        raise ValueError(f"Unknown priority strategy 
{priority_weight_strategy_class}")

Review Comment:
   I'd propose to move this cleanup - which is certainly useful - to a separate 
PR - to separate-out cleanup/functional changes with the SDK rework.



##########
task_sdk/src/airflow/sdk/definitions/abstractoperator.py:
##########
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from abc import abstractmethod
+from collections.abc import (
+    Collection,
+    Iterable,
+)
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    ClassVar,
+)
+
+from airflow.sdk.definitions.node import DAGNode
+from airflow.utils.log.secrets_masker import redact
+
+# TaskStateChangeCallback = Callable[[Context], None]
+
+if TYPE_CHECKING:
+    import jinja2  # Slow import.
+
+    from airflow.models.baseoperatorlink import BaseOperatorLink
+    from airflow.sdk.definitions.baseoperator import BaseOperator
+    from airflow.sdk.definitions.dag import DAG
+    from airflow.task.priority_strategy import PriorityWeightStrategy
+
+    # TODO: Task-SDK
+    Context = dict[str, Any]
+
+
+DEFAULT_OWNER: str = "airflow"
+DEFAULT_POOL_SLOTS: int = 1
+DEFAULT_PRIORITY_WEIGHT: int = 1
+DEFAULT_EXECUTOR: str | None = None
+DEFAULT_QUEUE: str = "default"
+DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
+DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
+DEFAULT_RETRIES: int = 0
+DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
+MAX_RETRY_DELAY: int = 24 * 60 * 60
+
+# TODO: Task-SDK
+# DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
+DEFAULT_TRIGGER_RULE = "all_success"
+DEFAULT_WEIGHT_RULE = "downstream"
+DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None
+
+
+class NotMapped(Exception):
+    """Raise if a task is neither mapped nor has any parent mapped groups."""
+
+
+class AbstractOperator(DAGNode):
+    """
+    Common implementation for operators, including unmapped and mapped.
+
+    This base class is more about sharing implementations, not defining a 
common
+    interface. Unfortunately it's difficult to use this as the common base 
class
+    for typing due to BaseOperator carrying too much historical baggage.
+
+    The union type ``from airflow.models.operator import Operator`` is easier
+    to use for typing purposes.
+
+    :meta private:
+    """
+
+    operator_class: type[BaseOperator] | dict[str, Any]

Review Comment:
   What is this for? We will carry the class of the instance in the abstact 
base as a member? Would this not be something that is resolved during runtime?



##########
task_sdk/src/airflow/sdk/__init__.py:
##########
@@ -16,6 +16,32 @@
 # under the License.
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
 
-def hello() -> str:
-    return "Hello from task-sdk!"
+__all__ = ["DAG", "BaseOperator", "TaskGroup"]
+
+if TYPE_CHECKING:
+    from airflow.sdk.definitions.baseoperator import BaseOperator as 
BaseOperator
+    from airflow.sdk.definitions.dag import DAG as DAG
+    from airflow.sdk.definitions.edges import EdgeModifier as EdgeModifier
+    from airflow.sdk.definitions.taskgroup import TaskGroup as TaskGroup
+
+__lazy_imports: dict[str, str] = {
+    "DAG": ".definitions.dag",
+    "BaseOperator": ".definitions.baseoperator",
+    "TaskGroup": ".definitions.taskgroup",
+    "EdgeModifier": ".definitions.edges",
+}
+
+
+def __getattr__(name: str):

Review Comment:
   LIKE!



##########
task_sdk/src/airflow/sdk/definitions/baseoperator.py:
##########
@@ -0,0 +1,1141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import abc
+import collections.abc
+import contextlib
+import copy
+import inspect
+import warnings
+from collections.abc import Iterable, Sequence
+from dataclasses import dataclass
+from datetime import datetime, timedelta
+from functools import total_ordering, wraps
+from types import FunctionType
+from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, cast
+
+import attrs
+
+from airflow.exceptions import FailStopDagInvalidTriggerRule
+from airflow.sdk.definitions.abstractoperator import (
+    DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
+    DEFAULT_OWNER,
+    DEFAULT_POOL_SLOTS,
+    DEFAULT_PRIORITY_WEIGHT,
+    DEFAULT_QUEUE,
+    DEFAULT_RETRIES,
+    DEFAULT_RETRY_DELAY,
+    DEFAULT_TASK_EXECUTION_TIMEOUT,
+    DEFAULT_TRIGGER_RULE,
+    DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
+    DEFAULT_WEIGHT_RULE,
+    AbstractOperator,
+)
+from airflow.sdk.definitions.decorators import fixup_decorator_warning_stack
+from airflow.sdk.definitions.node import validate_key
+from airflow.sdk.types import NOTSET, validate_instance_args
+from airflow.task.priority_strategy import PriorityWeightStrategy, 
validate_and_load_priority_weight_strategy
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.utils.types import AttributeRemoved
+
+T = TypeVar("T", bound=FunctionType)
+
+if TYPE_CHECKING:
+    # from ..execution_time.context import Context
+    class Context: ...
+
+    class ParamsDict: ...
+
+    from airflow.sdk.definitions.dag import DAG
+    from airflow.utils.operator_resources import Resources
+
+from airflow.sdk.definitions.taskgroup import TaskGroup
+
+# TODO: Task-SDK
+AirflowException = RuntimeError
+ParamsDict = dict
+
+
+def _get_parent_defaults(dag: DAG | None, task_group: TaskGroup | None) -> 
tuple[dict, ParamsDict]:
+    if not dag:
+        return {}, ParamsDict()
+    dag_args = copy.copy(dag.default_args)
+    dag_params = copy.deepcopy(dag.params)
+    if task_group:
+        if task_group.default_args and not isinstance(task_group.default_args, 
collections.abc.Mapping):
+            raise TypeError("default_args must be a mapping")
+        dag_args.update(task_group.default_args)
+    return dag_args, dag_params
+
+
+def get_merged_defaults(
+    dag: DAG | None,
+    task_group: TaskGroup | None,
+    task_params: collections.abc.MutableMapping | None,
+    task_default_args: dict | None,
+) -> tuple[dict, ParamsDict]:
+    args, params = _get_parent_defaults(dag, task_group)
+    if task_params:
+        if not isinstance(task_params, collections.abc.Mapping):
+            raise TypeError(f"params must be a mapping, got 
{type(task_params)}")
+        params.update(task_params)
+    if task_default_args:
+        if not isinstance(task_default_args, collections.abc.Mapping):
+            raise TypeError(f"default_args must be a mapping, got 
{type(task_params)}")
+        args.update(task_default_args)
+        with contextlib.suppress(KeyError):
+            params.update(task_default_args["params"] or {})
+    return args, params
+
+
+class BaseOperatorMeta(abc.ABCMeta):
+    """Metaclass of BaseOperator."""
+
+    @classmethod
+    def _apply_defaults(cls, func: T) -> T:
+        """
+        Look for an argument named "default_args", and fill the unspecified 
arguments from it.
+
+        Since python2.* isn't clear about which arguments are missing when
+        calling a function, and that this can be quite confusing with 
multi-level
+        inheritance and argument defaults, this decorator also alerts with
+        specific information about the missing arguments.
+        """
+        # Cache inspect.signature for the wrapper closure to avoid calling it
+        # at every decorated invocation. This is separate sig_cache created
+        # per decoration, i.e. each function decorated using apply_defaults 
will
+        # have a different sig_cache.
+        sig_cache = inspect.signature(func)
+        non_variadic_params = {
+            name: param
+            for (name, param) in sig_cache.parameters.items()
+            if param.name != "self" and param.kind not in 
(param.VAR_POSITIONAL, param.VAR_KEYWORD)
+        }
+        non_optional_args = {
+            name
+            for name, param in non_variadic_params.items()
+            if param.default == param.empty and name != "task_id"
+        }
+
+        fixup_decorator_warning_stack(func)
+
+        @wraps(func)
+        def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) -> 
Any:
+            from airflow.sdk.definitions.contextmanager import DagContext, 
TaskGroupContext
+
+            if args:
+                raise TypeError("Use keyword arguments when initializing 
operators")
+
+            instantiated_from_mapped = kwargs.pop(
+                "_airflow_from_mapped",
+                getattr(self, "_BaseOperator__from_mapped", False),
+            )
+
+            dag: DAG | None = kwargs.get("dag") or DagContext.get_current()
+            task_group: TaskGroup | None = kwargs.get("task_group")
+            if dag and not task_group:
+                task_group = TaskGroupContext.get_current(dag)
+
+            default_args, merged_params = get_merged_defaults(
+                dag=dag,
+                task_group=task_group,
+                task_params=kwargs.pop("params", None),
+                task_default_args=kwargs.pop("default_args", None),
+            )
+
+            for arg in sig_cache.parameters:
+                if arg not in kwargs and arg in default_args:
+                    kwargs[arg] = default_args[arg]
+
+            missing_args = non_optional_args.difference(kwargs)
+            if len(missing_args) == 1:
+                raise TypeError(f"missing keyword argument 
{missing_args.pop()!r}")
+            elif missing_args:
+                display = ", ".join(repr(a) for a in sorted(missing_args))
+                raise TypeError(f"missing keyword arguments {display}")
+
+            if merged_params:
+                kwargs["params"] = merged_params
+
+            hook = getattr(self, "_hook_apply_defaults", None)
+            if hook:
+                args, kwargs = hook(**kwargs, default_args=default_args)
+                default_args = kwargs.pop("default_args", {})
+
+            if not hasattr(self, "_BaseOperator__init_kwargs"):
+                object.__setattr__(self, "_BaseOperator__init_kwargs", {})
+            object.__setattr__(self, "_BaseOperator__from_mapped", 
instantiated_from_mapped)
+
+            result = func(self, **kwargs, default_args=default_args)
+
+            # Store the args passed to init -- we need them to support 
task.map serialization!
+            self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore
+
+            # Set upstream task defined by XComArgs passed to template fields 
of the operator.
+            # BUT: only do this _ONCE_, not once for each class in the 
hierarchy
+            if not instantiated_from_mapped and func == 
self.__init__.__wrapped__:  # type: ignore[misc]
+                self._set_xcomargs_dependencies()
+                # Mark instance as instantiated so that futre attr setting 
updates xcomarg-based deps.
+                object.__setattr__(self, "_BaseOperator__instantiated", True)
+
+            return result
+
+        apply_defaults.__non_optional_args = non_optional_args  # type: ignore
+        apply_defaults.__param_names = set(non_variadic_params)  # type: ignore
+
+        return cast(T, apply_defaults)
+
+    def __new__(cls, name, bases, namespace, **kwargs):
+        # TODO: Task-SDK
+        # execute_method = namespace.get("execute")
+        # if callable(execute_method) and not getattr(execute_method, 
"__isabstractmethod__", False):
+        #     namespace["execute"] = 
ExecutorSafeguard().decorator(execute_method)
+        new_cls = super().__new__(cls, name, bases, namespace, **kwargs)
+        with contextlib.suppress(KeyError):
+            # Update the partial descriptor with the class method, so it calls 
the actual function
+            # (but let subclasses override it if they need to)
+            # TODO: Task-SDK
+            # partial_desc = vars(new_cls)["partial"]
+            # if isinstance(partial_desc, _PartialDescriptor):
+            #     partial_desc.class_method = classmethod(partial)
+            ...
+
+        # We patch `__init__` only if the class defines it.
+        if inspect.getmro(new_cls)[1].__init__ is not new_cls.__init__:
+            new_cls.__init__ = cls._apply_defaults(new_cls.__init__)
+
+        return new_cls
+
+
+# TODO: The following mapping is used to validate that the arguments passed to 
the BaseOperator are of the
+#  correct type. This is a temporary solution until we find a more 
sophisticated method for argument
+#  validation. One potential method is to use `get_type_hints` from the typing 
module. However, this is not
+#  fully compatible with future annotations for Python versions below 3.10. 
Once we require a minimum Python
+#  version that supports `get_type_hints` effectively or find a better 
approach, we can replace this
+#  manual type-checking method.

Review Comment:
   Just thinking loud... I assume Pydantic is too heavy in here and also would 
add more complexity than bringing benefits... would it be an option to define a 
JSON schema for these and use the `__dict__` to validate against? Then the JSON 
schema could be used cross-language as well.... but nevermind...later



##########
task_sdk/src/airflow/sdk/definitions/abstractoperator.py:
##########
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from abc import abstractmethod
+from collections.abc import (
+    Collection,
+    Iterable,
+)
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    ClassVar,
+)
+
+from airflow.sdk.definitions.node import DAGNode
+from airflow.utils.log.secrets_masker import redact
+
+# TaskStateChangeCallback = Callable[[Context], None]
+
+if TYPE_CHECKING:
+    import jinja2  # Slow import.
+
+    from airflow.models.baseoperatorlink import BaseOperatorLink
+    from airflow.sdk.definitions.baseoperator import BaseOperator
+    from airflow.sdk.definitions.dag import DAG
+    from airflow.task.priority_strategy import PriorityWeightStrategy
+
+    # TODO: Task-SDK
+    Context = dict[str, Any]
+
+
+DEFAULT_OWNER: str = "airflow"
+DEFAULT_POOL_SLOTS: int = 1
+DEFAULT_PRIORITY_WEIGHT: int = 1
+DEFAULT_EXECUTOR: str | None = None
+DEFAULT_QUEUE: str = "default"
+DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
+DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
+DEFAULT_RETRIES: int = 0
+DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
+MAX_RETRY_DELAY: int = 24 * 60 * 60
+
+# TODO: Task-SDK
+# DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
+DEFAULT_TRIGGER_RULE = "all_success"
+DEFAULT_WEIGHT_RULE = "downstream"
+DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None
+
+
+class NotMapped(Exception):
+    """Raise if a task is neither mapped nor has any parent mapped groups."""
+
+
+class AbstractOperator(DAGNode):
+    """
+    Common implementation for operators, including unmapped and mapped.
+
+    This base class is more about sharing implementations, not defining a 
common
+    interface. Unfortunately it's difficult to use this as the common base 
class
+    for typing due to BaseOperator carrying too much historical baggage.

Review Comment:
   Would it be an alternative to push shared implementations to a Mixin? (or is 
this killing typing?)



##########
task_sdk/src/airflow/sdk/exceptions.py:
##########


Review Comment:
   Placeholder?



##########
task_sdk/src/airflow/sdk/definitions/abstractoperator.py:
##########
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from abc import abstractmethod
+from collections.abc import (
+    Collection,
+    Iterable,
+)
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    ClassVar,
+)
+
+from airflow.sdk.definitions.node import DAGNode
+from airflow.utils.log.secrets_masker import redact
+
+# TaskStateChangeCallback = Callable[[Context], None]
+
+if TYPE_CHECKING:
+    import jinja2  # Slow import.
+
+    from airflow.models.baseoperatorlink import BaseOperatorLink
+    from airflow.sdk.definitions.baseoperator import BaseOperator
+    from airflow.sdk.definitions.dag import DAG
+    from airflow.task.priority_strategy import PriorityWeightStrategy
+
+    # TODO: Task-SDK
+    Context = dict[str, Any]
+
+
+DEFAULT_OWNER: str = "airflow"
+DEFAULT_POOL_SLOTS: int = 1
+DEFAULT_PRIORITY_WEIGHT: int = 1
+DEFAULT_EXECUTOR: str | None = None
+DEFAULT_QUEUE: str = "default"
+DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
+DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
+DEFAULT_RETRIES: int = 0
+DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
+MAX_RETRY_DELAY: int = 24 * 60 * 60
+
+# TODO: Task-SDK
+# DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
+DEFAULT_TRIGGER_RULE = "all_success"
+DEFAULT_WEIGHT_RULE = "downstream"
+DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None
+
+
+class NotMapped(Exception):
+    """Raise if a task is neither mapped nor has any parent mapped groups."""
+
+
+class AbstractOperator(DAGNode):
+    """
+    Common implementation for operators, including unmapped and mapped.
+
+    This base class is more about sharing implementations, not defining a 
common
+    interface. Unfortunately it's difficult to use this as the common base 
class
+    for typing due to BaseOperator carrying too much historical baggage.
+
+    The union type ``from airflow.models.operator import Operator`` is easier
+    to use for typing purposes.
+
+    :meta private:
+    """
+
+    operator_class: type[BaseOperator] | dict[str, Any]
+
+    weight_rule: PriorityWeightStrategy
+    priority_weight: int
+
+    # Defines the operator level extra links.
+    operator_extra_links: Collection[BaseOperatorLink]
+
+    owner: str
+    task_id: str
+
+    outlets: list
+    inlets: list
+    # TODO:
+    # trigger_rule: TriggerRule
+    _needs_expansion: bool | None = None
+    _on_failure_fail_dagrun = False
+
+    HIDE_ATTRS_FROM_UI: ClassVar[frozenset[str]] = frozenset(
+        (
+            "log",
+            "dag",  # We show dag_id, don't need to show this too
+            "node_id",  # Duplicates task_id
+            "task_group",  # Doesn't have a useful repr, no point showing in UI
+            "inherits_from_empty_operator",  # impl detail
+            # Decide whether to start task execution from triggerer
+            "start_trigger_args",
+            "start_from_trigger",
+            # For compatibility with TG, for operators these are just the 
current task, no point showing
+            "roots",
+            "leaves",
+            # These lists are already shown via *_task_ids
+            "upstream_list",
+            "downstream_list",
+            # Not useful, implementation detail, already shown elsewhere
+            "global_operator_extra_link_dict",
+            "operator_extra_link_dict",
+        )
+    )
+
+    def get_dag(self) -> DAG | None:
+        raise NotImplementedError()
+
+    @property
+    def task_type(self) -> str:
+        raise NotImplementedError()
+
+    @property
+    def operator_name(self) -> str:
+        raise NotImplementedError()
+
+    @property
+    def inherits_from_empty_operator(self) -> bool:
+        raise NotImplementedError()
+
+    @property
+    def dag_id(self) -> str:
+        """Returns dag id if it has one or an adhoc + owner."""
+        dag = self.get_dag()
+        if dag:
+            return dag.dag_id
+        return f"adhoc_{self.owner}"
+
+    @property
+    def node_id(self) -> str:
+        return self.task_id

Review Comment:
   This also has a smell of a model that could have some cleanup. This is 
mainly to render a grph of nodes where operators/tasks are just some elements 
besides task groups and connections+labels?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to