kaxil commented on code in PR #43076:
URL: https://github.com/apache/airflow/pull/43076#discussion_r1803726426
##########
task_sdk/src/airflow/sdk/__init__.py:
##########
@@ -16,6 +16,32 @@
# under the License.
from __future__ import annotations
+from typing import TYPE_CHECKING
-def hello() -> str:
- return "Hello from task-sdk!"
+__all__ = ["DAG", "BaseOperator", "TaskGroup"]
+
+if TYPE_CHECKING:
+ from airflow.sdk.definitions.baseoperator import BaseOperator as
BaseOperator
+ from airflow.sdk.definitions.dag import DAG as DAG
+ from airflow.sdk.definitions.edges import EdgeModifier as EdgeModifier
+ from airflow.sdk.definitions.taskgroup import TaskGroup as TaskGroup
Review Comment:
Since `as` part contains same name for them
```suggestion
from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.edges import EdgeModifier
from airflow.sdk.definitions.taskgroup import TaskGroup
```
##########
task_sdk/src/airflow/sdk/definitions/abstractoperator.py:
##########
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from abc import abstractmethod
+from collections.abc import (
+ Collection,
+ Iterable,
+)
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ ClassVar,
+)
+
+from airflow.sdk.definitions.node import DAGNode
+from airflow.utils.log.secrets_masker import redact
+
+# TaskStateChangeCallback = Callable[[Context], None]
+
+if TYPE_CHECKING:
+ import jinja2 # Slow import.
+
+ from airflow.models.baseoperatorlink import BaseOperatorLink
+ from airflow.sdk.definitions.baseoperator import BaseOperator
+ from airflow.sdk.definitions.dag import DAG
+ from airflow.task.priority_strategy import PriorityWeightStrategy
+
+ # TODO: Task-SDK
+ Context = dict[str, Any]
+
+
+DEFAULT_OWNER: str = "airflow"
+DEFAULT_POOL_SLOTS: int = 1
+DEFAULT_PRIORITY_WEIGHT: int = 1
+DEFAULT_EXECUTOR: str | None = None
+DEFAULT_QUEUE: str = "default"
+DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
+DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
+DEFAULT_RETRIES: int = 0
+DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
+MAX_RETRY_DELAY: int = 24 * 60 * 60
+
+# TODO: Task-SDK
+# DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
+DEFAULT_TRIGGER_RULE = "all_success"
+DEFAULT_WEIGHT_RULE = "downstream"
+DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None
+
+
+class NotMapped(Exception):
+ """Raise if a task is neither mapped nor has any parent mapped groups."""
+
+
+class AbstractOperator(DAGNode):
+ """
+ Common implementation for operators, including unmapped and mapped.
+
+ This base class is more about sharing implementations, not defining a
common
+ interface. Unfortunately it's difficult to use this as the common base
class
+ for typing due to BaseOperator carrying too much historical baggage.
+
+ The union type ``from airflow.models.operator import Operator`` is easier
+ to use for typing purposes.
+
+ :meta private:
+ """
+
+ operator_class: type[BaseOperator] | dict[str, Any]
+
+ weight_rule: PriorityWeightStrategy
+ priority_weight: int
+
+ # Defines the operator level extra links.
+ operator_extra_links: Collection[BaseOperatorLink]
+
+ owner: str
+ task_id: str
+
+ outlets: list
+ inlets: list
+ # TODO:
+ # trigger_rule: TriggerRule
Review Comment:
Is this pulling a lot of the scheduler stuff?
##########
task_sdk/src/airflow/sdk/definitions/abstractoperator.py:
##########
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from abc import abstractmethod
+from collections.abc import (
+ Collection,
+ Iterable,
+)
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ ClassVar,
+)
+
+from airflow.sdk.definitions.node import DAGNode
+from airflow.utils.log.secrets_masker import redact
Review Comment:
Should the `redact` function stay in `airflow.core` or be moved to SDK?
I am not firm on either yet
##########
task_sdk/src/airflow/sdk/definitions/dag.py:
##########
@@ -0,0 +1,1048 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import copy
+import functools
+import itertools
+import logging
+import os
+import pathlib
+import sys
+import weakref
+from collections import abc
+from collections.abc import Collection, Iterable, Iterator
+from datetime import datetime, timedelta, timezone
+from inspect import signature
+from re import Pattern
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Callable,
+ ClassVar,
+ Self,
+ Union,
+ cast,
+)
+from urllib.parse import urlsplit
+
+import attrs
+import jinja2
+import re2
+from dateutil.relativedelta import relativedelta
+
+import airflow.templates
+from airflow import settings
+from airflow.assets import Asset, AssetAlias, BaseAsset
+from airflow.configuration import conf as airflow_conf
+from airflow.exceptions import (
+ DuplicateTaskIdFound,
+ FailStopDagInvalidTriggerRule,
+ ParamValidationError,
+ TaskNotFound,
+)
+from airflow.models.param import DagParam
+from airflow.sdk.definitions.abstractoperator import AbstractOperator
+from airflow.sdk.definitions.baseoperator import BaseOperator
+from airflow.sdk.types import NOTSET
+from airflow.stats import Stats
+from airflow.timetables.base import Timetable
+from airflow.timetables.interval import CronDataIntervalTimetable,
DeltaDataIntervalTimetable
+from airflow.timetables.simple import (
+ AssetTriggeredTimetable,
+ ContinuousTimetable,
+ NullTimetable,
+ OnceTimetable,
Review Comment:
Where should Timetables live?
User can pass it to the DAG --- so maybe SDK.
Yeah all of it is intertwined
[21:27](https://astronomer.slack.com/archives/DJ28R0NKZ/p1729110420516149)
User can pass it to the DAG --- so maybe SDK
##########
task_sdk/src/airflow/sdk/definitions/abstractoperator.py:
##########
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from abc import abstractmethod
+from collections.abc import (
+ Collection,
+ Iterable,
+)
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ ClassVar,
+)
+
+from airflow.sdk.definitions.node import DAGNode
+from airflow.utils.log.secrets_masker import redact
+
+# TaskStateChangeCallback = Callable[[Context], None]
+
+if TYPE_CHECKING:
+ import jinja2 # Slow import.
+
+ from airflow.models.baseoperatorlink import BaseOperatorLink
+ from airflow.sdk.definitions.baseoperator import BaseOperator
+ from airflow.sdk.definitions.dag import DAG
+ from airflow.task.priority_strategy import PriorityWeightStrategy
+
+ # TODO: Task-SDK
+ Context = dict[str, Any]
+
+
+DEFAULT_OWNER: str = "airflow"
+DEFAULT_POOL_SLOTS: int = 1
+DEFAULT_PRIORITY_WEIGHT: int = 1
+DEFAULT_EXECUTOR: str | None = None
+DEFAULT_QUEUE: str = "default"
+DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
+DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
+DEFAULT_RETRIES: int = 0
+DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
+MAX_RETRY_DELAY: int = 24 * 60 * 60
+
+# TODO: Task-SDK
+# DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
+DEFAULT_TRIGGER_RULE = "all_success"
+DEFAULT_WEIGHT_RULE = "downstream"
+DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None
+
+
+class NotMapped(Exception):
+ """Raise if a task is neither mapped nor has any parent mapped groups."""
Review Comment:
A separate file with all exception needed for SDK sounds good
##########
task_sdk/src/airflow/sdk/definitions/abstractoperator.py:
##########
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from abc import abstractmethod
+from collections.abc import (
+ Collection,
+ Iterable,
+)
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ ClassVar,
+)
+
+from airflow.sdk.definitions.node import DAGNode
+from airflow.utils.log.secrets_masker import redact
+
+# TaskStateChangeCallback = Callable[[Context], None]
+
+if TYPE_CHECKING:
+ import jinja2 # Slow import.
+
+ from airflow.models.baseoperatorlink import BaseOperatorLink
+ from airflow.sdk.definitions.baseoperator import BaseOperator
+ from airflow.sdk.definitions.dag import DAG
+ from airflow.task.priority_strategy import PriorityWeightStrategy
+
+ # TODO: Task-SDK
+ Context = dict[str, Any]
+
+
+DEFAULT_OWNER: str = "airflow"
+DEFAULT_POOL_SLOTS: int = 1
+DEFAULT_PRIORITY_WEIGHT: int = 1
+DEFAULT_EXECUTOR: str | None = None
+DEFAULT_QUEUE: str = "default"
+DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
+DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
+DEFAULT_RETRIES: int = 0
+DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
+MAX_RETRY_DELAY: int = 24 * 60 * 60
+
+# TODO: Task-SDK
+# DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
+DEFAULT_TRIGGER_RULE = "all_success"
+DEFAULT_WEIGHT_RULE = "downstream"
+DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None
+
+
+class NotMapped(Exception):
+ """Raise if a task is neither mapped nor has any parent mapped groups."""
+
+
+class AbstractOperator(DAGNode):
+ """
+ Common implementation for operators, including unmapped and mapped.
+
+ This base class is more about sharing implementations, not defining a
common
+ interface. Unfortunately it's difficult to use this as the common base
class
+ for typing due to BaseOperator carrying too much historical baggage.
+
+ The union type ``from airflow.models.operator import Operator`` is easier
+ to use for typing purposes.
+
+ :meta private:
+ """
+
+ operator_class: type[BaseOperator] | dict[str, Any]
+
+ weight_rule: PriorityWeightStrategy
+ priority_weight: int
+
+ # Defines the operator level extra links.
+ operator_extra_links: Collection[BaseOperatorLink]
Review Comment:
We will have to bring this in SDK too
##########
task_sdk/src/airflow/sdk/definitions/baseoperator.py:
##########
@@ -0,0 +1,1141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import abc
+import collections.abc
+import contextlib
+import copy
+import inspect
+import warnings
+from collections.abc import Iterable, Sequence
+from dataclasses import dataclass
+from datetime import datetime, timedelta
+from functools import total_ordering, wraps
+from types import FunctionType
+from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, cast
+
+import attrs
+
+from airflow.exceptions import FailStopDagInvalidTriggerRule
+from airflow.sdk.definitions.abstractoperator import (
+ DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
+ DEFAULT_OWNER,
+ DEFAULT_POOL_SLOTS,
+ DEFAULT_PRIORITY_WEIGHT,
+ DEFAULT_QUEUE,
+ DEFAULT_RETRIES,
+ DEFAULT_RETRY_DELAY,
+ DEFAULT_TASK_EXECUTION_TIMEOUT,
+ DEFAULT_TRIGGER_RULE,
+ DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
+ DEFAULT_WEIGHT_RULE,
+ AbstractOperator,
+)
+from airflow.sdk.definitions.decorators import fixup_decorator_warning_stack
+from airflow.sdk.definitions.node import validate_key
+from airflow.sdk.types import NOTSET, validate_instance_args
+from airflow.task.priority_strategy import PriorityWeightStrategy,
validate_and_load_priority_weight_strategy
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.utils.types import AttributeRemoved
+
+T = TypeVar("T", bound=FunctionType)
+
+if TYPE_CHECKING:
+ # from ..execution_time.context import Context
+ class Context: ...
+
+ class ParamsDict: ...
+
+ from airflow.sdk.definitions.dag import DAG
+ from airflow.utils.operator_resources import Resources
+
+from airflow.sdk.definitions.taskgroup import TaskGroup
+
+# TODO: Task-SDK
+AirflowException = RuntimeError
+ParamsDict = dict
+
+
+def _get_parent_defaults(dag: DAG | None, task_group: TaskGroup | None) ->
tuple[dict, ParamsDict]:
+ if not dag:
+ return {}, ParamsDict()
+ dag_args = copy.copy(dag.default_args)
+ dag_params = copy.deepcopy(dag.params)
+ if task_group:
+ if task_group.default_args and not isinstance(task_group.default_args,
collections.abc.Mapping):
+ raise TypeError("default_args must be a mapping")
+ dag_args.update(task_group.default_args)
+ return dag_args, dag_params
+
+
+def get_merged_defaults(
+ dag: DAG | None,
+ task_group: TaskGroup | None,
+ task_params: collections.abc.MutableMapping | None,
+ task_default_args: dict | None,
+) -> tuple[dict, ParamsDict]:
+ args, params = _get_parent_defaults(dag, task_group)
+ if task_params:
+ if not isinstance(task_params, collections.abc.Mapping):
+ raise TypeError(f"params must be a mapping, got
{type(task_params)}")
+ params.update(task_params)
+ if task_default_args:
+ if not isinstance(task_default_args, collections.abc.Mapping):
+ raise TypeError(f"default_args must be a mapping, got
{type(task_params)}")
+ args.update(task_default_args)
+ with contextlib.suppress(KeyError):
+ params.update(task_default_args["params"] or {})
+ return args, params
+
+
+class BaseOperatorMeta(abc.ABCMeta):
+ """Metaclass of BaseOperator."""
+
+ @classmethod
+ def _apply_defaults(cls, func: T) -> T:
+ """
+ Look for an argument named "default_args", and fill the unspecified
arguments from it.
+
+ Since python2.* isn't clear about which arguments are missing when
+ calling a function, and that this can be quite confusing with
multi-level
+ inheritance and argument defaults, this decorator also alerts with
+ specific information about the missing arguments.
+ """
+ # Cache inspect.signature for the wrapper closure to avoid calling it
+ # at every decorated invocation. This is separate sig_cache created
+ # per decoration, i.e. each function decorated using apply_defaults
will
+ # have a different sig_cache.
+ sig_cache = inspect.signature(func)
+ non_variadic_params = {
+ name: param
+ for (name, param) in sig_cache.parameters.items()
+ if param.name != "self" and param.kind not in
(param.VAR_POSITIONAL, param.VAR_KEYWORD)
+ }
+ non_optional_args = {
+ name
+ for name, param in non_variadic_params.items()
+ if param.default == param.empty and name != "task_id"
+ }
+
+ fixup_decorator_warning_stack(func)
+
+ @wraps(func)
+ def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) ->
Any:
+ from airflow.sdk.definitions.contextmanager import DagContext,
TaskGroupContext
+
+ if args:
+ raise TypeError("Use keyword arguments when initializing
operators")
+
+ instantiated_from_mapped = kwargs.pop(
+ "_airflow_from_mapped",
+ getattr(self, "_BaseOperator__from_mapped", False),
+ )
+
+ dag: DAG | None = kwargs.get("dag") or DagContext.get_current()
+ task_group: TaskGroup | None = kwargs.get("task_group")
+ if dag and not task_group:
+ task_group = TaskGroupContext.get_current(dag)
+
+ default_args, merged_params = get_merged_defaults(
+ dag=dag,
+ task_group=task_group,
+ task_params=kwargs.pop("params", None),
+ task_default_args=kwargs.pop("default_args", None),
+ )
+
+ for arg in sig_cache.parameters:
+ if arg not in kwargs and arg in default_args:
+ kwargs[arg] = default_args[arg]
+
+ missing_args = non_optional_args.difference(kwargs)
+ if len(missing_args) == 1:
+ raise TypeError(f"missing keyword argument
{missing_args.pop()!r}")
+ elif missing_args:
+ display = ", ".join(repr(a) for a in sorted(missing_args))
+ raise TypeError(f"missing keyword arguments {display}")
+
+ if merged_params:
+ kwargs["params"] = merged_params
+
+ hook = getattr(self, "_hook_apply_defaults", None)
+ if hook:
+ args, kwargs = hook(**kwargs, default_args=default_args)
+ default_args = kwargs.pop("default_args", {})
+
+ if not hasattr(self, "_BaseOperator__init_kwargs"):
+ object.__setattr__(self, "_BaseOperator__init_kwargs", {})
+ object.__setattr__(self, "_BaseOperator__from_mapped",
instantiated_from_mapped)
+
+ result = func(self, **kwargs, default_args=default_args)
+
+ # Store the args passed to init -- we need them to support
task.map serialization!
+ self._BaseOperator__init_kwargs.update(kwargs) # type: ignore
+
+ # Set upstream task defined by XComArgs passed to template fields
of the operator.
+ # BUT: only do this _ONCE_, not once for each class in the
hierarchy
+ if not instantiated_from_mapped and func ==
self.__init__.__wrapped__: # type: ignore[misc]
+ self._set_xcomargs_dependencies()
+ # Mark instance as instantiated so that futre attr setting
updates xcomarg-based deps.
+ object.__setattr__(self, "_BaseOperator__instantiated", True)
+
+ return result
+
+ apply_defaults.__non_optional_args = non_optional_args # type: ignore
+ apply_defaults.__param_names = set(non_variadic_params) # type: ignore
+
+ return cast(T, apply_defaults)
+
+ def __new__(cls, name, bases, namespace, **kwargs):
+ # TODO: Task-SDK
+ # execute_method = namespace.get("execute")
+ # if callable(execute_method) and not getattr(execute_method,
"__isabstractmethod__", False):
+ # namespace["execute"] =
ExecutorSafeguard().decorator(execute_method)
+ new_cls = super().__new__(cls, name, bases, namespace, **kwargs)
+ with contextlib.suppress(KeyError):
+ # Update the partial descriptor with the class method, so it calls
the actual function
+ # (but let subclasses override it if they need to)
+ # TODO: Task-SDK
+ # partial_desc = vars(new_cls)["partial"]
+ # if isinstance(partial_desc, _PartialDescriptor):
+ # partial_desc.class_method = classmethod(partial)
+ ...
+
+ # We patch `__init__` only if the class defines it.
+ if inspect.getmro(new_cls)[1].__init__ is not new_cls.__init__:
+ new_cls.__init__ = cls._apply_defaults(new_cls.__init__)
+
+ return new_cls
+
+
+# TODO: The following mapping is used to validate that the arguments passed to
the BaseOperator are of the
+# correct type. This is a temporary solution until we find a more
sophisticated method for argument
+# validation. One potential method is to use `get_type_hints` from the typing
module. However, this is not
+# fully compatible with future annotations for Python versions below 3.10.
Once we require a minimum Python
+# version that supports `get_type_hints` effectively or find a better
approach, we can replace this
+# manual type-checking method.
Review Comment:
Yeah worth solving but outside of this PR
##########
task_sdk/src/airflow/sdk/definitions/abstractoperator.py:
##########
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from abc import abstractmethod
+from collections.abc import (
+ Collection,
+ Iterable,
+)
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ ClassVar,
+)
+
+from airflow.sdk.definitions.node import DAGNode
+from airflow.utils.log.secrets_masker import redact
+
+# TaskStateChangeCallback = Callable[[Context], None]
+
+if TYPE_CHECKING:
+ import jinja2 # Slow import.
+
+ from airflow.models.baseoperatorlink import BaseOperatorLink
+ from airflow.sdk.definitions.baseoperator import BaseOperator
+ from airflow.sdk.definitions.dag import DAG
+ from airflow.task.priority_strategy import PriorityWeightStrategy
+
+ # TODO: Task-SDK
+ Context = dict[str, Any]
+
+
+DEFAULT_OWNER: str = "airflow"
+DEFAULT_POOL_SLOTS: int = 1
+DEFAULT_PRIORITY_WEIGHT: int = 1
+DEFAULT_EXECUTOR: str | None = None
+DEFAULT_QUEUE: str = "default"
+DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
+DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
+DEFAULT_RETRIES: int = 0
+DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
+MAX_RETRY_DELAY: int = 24 * 60 * 60
+
+# TODO: Task-SDK
+# DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
+DEFAULT_TRIGGER_RULE = "all_success"
+DEFAULT_WEIGHT_RULE = "downstream"
+DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None
+
+
+class NotMapped(Exception):
+ """Raise if a task is neither mapped nor has any parent mapped groups."""
+
+
+class AbstractOperator(DAGNode):
+ """
+ Common implementation for operators, including unmapped and mapped.
+
+ This base class is more about sharing implementations, not defining a
common
+ interface. Unfortunately it's difficult to use this as the common base
class
+ for typing due to BaseOperator carrying too much historical baggage.
+
+ The union type ``from airflow.models.operator import Operator`` is easier
+ to use for typing purposes.
+
+ :meta private:
+ """
+
+ operator_class: type[BaseOperator] | dict[str, Any]
+
+ weight_rule: PriorityWeightStrategy
+ priority_weight: int
+
+ # Defines the operator level extra links.
+ operator_extra_links: Collection[BaseOperatorLink]
+
+ owner: str
+ task_id: str
+
+ outlets: list
+ inlets: list
+ # TODO:
+ # trigger_rule: TriggerRule
+ _needs_expansion: bool | None = None
+ _on_failure_fail_dagrun = False
+
+ HIDE_ATTRS_FROM_UI: ClassVar[frozenset[str]] = frozenset(
+ (
+ "log",
+ "dag", # We show dag_id, don't need to show this too
+ "node_id", # Duplicates task_id
+ "task_group", # Doesn't have a useful repr, no point showing in UI
+ "inherits_from_empty_operator", # impl detail
+ # Decide whether to start task execution from triggerer
+ "start_trigger_args",
+ "start_from_trigger",
+ # For compatibility with TG, for operators these are just the
current task, no point showing
+ "roots",
+ "leaves",
+ # These lists are already shown via *_task_ids
+ "upstream_list",
+ "downstream_list",
+ # Not useful, implementation detail, already shown elsewhere
+ "global_operator_extra_link_dict",
+ "operator_extra_link_dict",
+ )
+ )
+
+ def get_dag(self) -> DAG | None:
+ raise NotImplementedError()
+
+ @property
+ def task_type(self) -> str:
+ raise NotImplementedError()
+
+ @property
+ def operator_name(self) -> str:
+ raise NotImplementedError()
+
+ @property
+ def inherits_from_empty_operator(self) -> bool:
Review Comment:
Yeah, this can be a follow-up and independent
##########
task_sdk/src/airflow/sdk/definitions/mixins.py:
##########
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from abc import abstractmethod
+from collections.abc import Iterable, Sequence
+from typing import TYPE_CHECKING, Any
+
+from ..types import NOTSET, ArgNotSet
+
+if TYPE_CHECKING:
+ from .baseoperator import BaseOperator
+ from .edges import EdgeModifier
Review Comment:
Relative imports for some reason?
##########
uv.lock:
##########
Review Comment:
>Unlike the pyproject.toml, which is used to specify the broad requirements
of your project, the lockfile contains the exact resolved versions that are
installed in the project environment. This file should be checked into version
control, allowing for consistent and reproducible installations across machines.
https://docs.astral.sh/uv/concepts/projects/#project-lockfile
##########
task_sdk/src/airflow/sdk/definitions/baseoperator.py:
##########
@@ -0,0 +1,1141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import abc
+import collections.abc
+import contextlib
+import copy
+import inspect
+import warnings
+from collections.abc import Iterable, Sequence
+from dataclasses import dataclass
+from datetime import datetime, timedelta
+from functools import total_ordering, wraps
+from types import FunctionType
+from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, cast
+
+import attrs
+
+from airflow.exceptions import FailStopDagInvalidTriggerRule
+from airflow.sdk.definitions.abstractoperator import (
+ DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
+ DEFAULT_OWNER,
+ DEFAULT_POOL_SLOTS,
+ DEFAULT_PRIORITY_WEIGHT,
+ DEFAULT_QUEUE,
+ DEFAULT_RETRIES,
+ DEFAULT_RETRY_DELAY,
+ DEFAULT_TASK_EXECUTION_TIMEOUT,
+ DEFAULT_TRIGGER_RULE,
+ DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
+ DEFAULT_WEIGHT_RULE,
+ AbstractOperator,
+)
+from airflow.sdk.definitions.decorators import fixup_decorator_warning_stack
+from airflow.sdk.definitions.node import validate_key
+from airflow.sdk.types import NOTSET, validate_instance_args
+from airflow.task.priority_strategy import PriorityWeightStrategy,
validate_and_load_priority_weight_strategy
Review Comment:
Hmm this is just needed for types currently, but we do let users set in..
This would also be a complex one to untangle
##########
uv.lock:
##########
Review Comment:
feature afaik
##########
task_sdk/src/airflow/sdk/definitions/baseoperator.py:
##########
@@ -0,0 +1,1141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import abc
+import collections.abc
+import contextlib
+import copy
+import inspect
+import warnings
+from collections.abc import Iterable, Sequence
+from dataclasses import dataclass
+from datetime import datetime, timedelta
+from functools import total_ordering, wraps
+from types import FunctionType
+from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, cast
+
+import attrs
+
+from airflow.exceptions import FailStopDagInvalidTriggerRule
+from airflow.sdk.definitions.abstractoperator import (
+ DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
+ DEFAULT_OWNER,
+ DEFAULT_POOL_SLOTS,
+ DEFAULT_PRIORITY_WEIGHT,
+ DEFAULT_QUEUE,
+ DEFAULT_RETRIES,
+ DEFAULT_RETRY_DELAY,
+ DEFAULT_TASK_EXECUTION_TIMEOUT,
+ DEFAULT_TRIGGER_RULE,
+ DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
+ DEFAULT_WEIGHT_RULE,
+ AbstractOperator,
+)
+from airflow.sdk.definitions.decorators import fixup_decorator_warning_stack
+from airflow.sdk.definitions.node import validate_key
+from airflow.sdk.types import NOTSET, validate_instance_args
+from airflow.task.priority_strategy import PriorityWeightStrategy,
validate_and_load_priority_weight_strategy
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.utils.types import AttributeRemoved
+
+T = TypeVar("T", bound=FunctionType)
+
+if TYPE_CHECKING:
+ # from ..execution_time.context import Context
+ class Context: ...
+
+ class ParamsDict: ...
+
+ from airflow.sdk.definitions.dag import DAG
+ from airflow.utils.operator_resources import Resources
+
+from airflow.sdk.definitions.taskgroup import TaskGroup
+
+# TODO: Task-SDK
+AirflowException = RuntimeError
+ParamsDict = dict
+
+
+def _get_parent_defaults(dag: DAG | None, task_group: TaskGroup | None) ->
tuple[dict, ParamsDict]:
+ if not dag:
+ return {}, ParamsDict()
+ dag_args = copy.copy(dag.default_args)
+ dag_params = copy.deepcopy(dag.params)
+ if task_group:
+ if task_group.default_args and not isinstance(task_group.default_args,
collections.abc.Mapping):
+ raise TypeError("default_args must be a mapping")
+ dag_args.update(task_group.default_args)
+ return dag_args, dag_params
+
+
+def get_merged_defaults(
+ dag: DAG | None,
+ task_group: TaskGroup | None,
+ task_params: collections.abc.MutableMapping | None,
+ task_default_args: dict | None,
+) -> tuple[dict, ParamsDict]:
+ args, params = _get_parent_defaults(dag, task_group)
+ if task_params:
+ if not isinstance(task_params, collections.abc.Mapping):
+ raise TypeError(f"params must be a mapping, got
{type(task_params)}")
+ params.update(task_params)
+ if task_default_args:
+ if not isinstance(task_default_args, collections.abc.Mapping):
+ raise TypeError(f"default_args must be a mapping, got
{type(task_params)}")
+ args.update(task_default_args)
+ with contextlib.suppress(KeyError):
+ params.update(task_default_args["params"] or {})
+ return args, params
+
+
+class BaseOperatorMeta(abc.ABCMeta):
+ """Metaclass of BaseOperator."""
+
+ @classmethod
+ def _apply_defaults(cls, func: T) -> T:
+ """
+ Look for an argument named "default_args", and fill the unspecified
arguments from it.
+
+ Since python2.* isn't clear about which arguments are missing when
+ calling a function, and that this can be quite confusing with
multi-level
+ inheritance and argument defaults, this decorator also alerts with
+ specific information about the missing arguments.
+ """
+ # Cache inspect.signature for the wrapper closure to avoid calling it
+ # at every decorated invocation. This is separate sig_cache created
+ # per decoration, i.e. each function decorated using apply_defaults
will
+ # have a different sig_cache.
+ sig_cache = inspect.signature(func)
+ non_variadic_params = {
+ name: param
+ for (name, param) in sig_cache.parameters.items()
+ if param.name != "self" and param.kind not in
(param.VAR_POSITIONAL, param.VAR_KEYWORD)
+ }
+ non_optional_args = {
+ name
+ for name, param in non_variadic_params.items()
+ if param.default == param.empty and name != "task_id"
+ }
+
+ fixup_decorator_warning_stack(func)
+
+ @wraps(func)
+ def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) ->
Any:
+ from airflow.sdk.definitions.contextmanager import DagContext,
TaskGroupContext
+
+ if args:
+ raise TypeError("Use keyword arguments when initializing
operators")
+
+ instantiated_from_mapped = kwargs.pop(
+ "_airflow_from_mapped",
+ getattr(self, "_BaseOperator__from_mapped", False),
+ )
+
+ dag: DAG | None = kwargs.get("dag") or DagContext.get_current()
+ task_group: TaskGroup | None = kwargs.get("task_group")
+ if dag and not task_group:
+ task_group = TaskGroupContext.get_current(dag)
+
+ default_args, merged_params = get_merged_defaults(
+ dag=dag,
+ task_group=task_group,
+ task_params=kwargs.pop("params", None),
+ task_default_args=kwargs.pop("default_args", None),
+ )
+
+ for arg in sig_cache.parameters:
+ if arg not in kwargs and arg in default_args:
+ kwargs[arg] = default_args[arg]
+
+ missing_args = non_optional_args.difference(kwargs)
+ if len(missing_args) == 1:
+ raise TypeError(f"missing keyword argument
{missing_args.pop()!r}")
+ elif missing_args:
+ display = ", ".join(repr(a) for a in sorted(missing_args))
+ raise TypeError(f"missing keyword arguments {display}")
+
+ if merged_params:
+ kwargs["params"] = merged_params
+
+ hook = getattr(self, "_hook_apply_defaults", None)
+ if hook:
+ args, kwargs = hook(**kwargs, default_args=default_args)
+ default_args = kwargs.pop("default_args", {})
+
+ if not hasattr(self, "_BaseOperator__init_kwargs"):
+ object.__setattr__(self, "_BaseOperator__init_kwargs", {})
+ object.__setattr__(self, "_BaseOperator__from_mapped",
instantiated_from_mapped)
+
+ result = func(self, **kwargs, default_args=default_args)
+
+ # Store the args passed to init -- we need them to support
task.map serialization!
+ self._BaseOperator__init_kwargs.update(kwargs) # type: ignore
+
+ # Set upstream task defined by XComArgs passed to template fields
of the operator.
+ # BUT: only do this _ONCE_, not once for each class in the
hierarchy
+ if not instantiated_from_mapped and func ==
self.__init__.__wrapped__: # type: ignore[misc]
+ self._set_xcomargs_dependencies()
+ # Mark instance as instantiated so that futre attr setting
updates xcomarg-based deps.
+ object.__setattr__(self, "_BaseOperator__instantiated", True)
+
+ return result
+
+ apply_defaults.__non_optional_args = non_optional_args # type: ignore
+ apply_defaults.__param_names = set(non_variadic_params) # type: ignore
+
+ return cast(T, apply_defaults)
+
+ def __new__(cls, name, bases, namespace, **kwargs):
+ # TODO: Task-SDK
+ # execute_method = namespace.get("execute")
+ # if callable(execute_method) and not getattr(execute_method,
"__isabstractmethod__", False):
+ # namespace["execute"] =
ExecutorSafeguard().decorator(execute_method)
+ new_cls = super().__new__(cls, name, bases, namespace, **kwargs)
+ with contextlib.suppress(KeyError):
+ # Update the partial descriptor with the class method, so it calls
the actual function
+ # (but let subclasses override it if they need to)
+ # TODO: Task-SDK
+ # partial_desc = vars(new_cls)["partial"]
+ # if isinstance(partial_desc, _PartialDescriptor):
+ # partial_desc.class_method = classmethod(partial)
+ ...
+
+ # We patch `__init__` only if the class defines it.
+ if inspect.getmro(new_cls)[1].__init__ is not new_cls.__init__:
+ new_cls.__init__ = cls._apply_defaults(new_cls.__init__)
+
+ return new_cls
+
+
+# TODO: The following mapping is used to validate that the arguments passed to
the BaseOperator are of the
+# correct type. This is a temporary solution until we find a more
sophisticated method for argument
+# validation. One potential method is to use `get_type_hints` from the typing
module. However, this is not
+# fully compatible with future annotations for Python versions below 3.10.
Once we require a minimum Python
+# version that supports `get_type_hints` effectively or find a better
approach, we can replace this
+# manual type-checking method.
+BASEOPERATOR_ARGS_EXPECTED_TYPES = {
+ "task_id": str,
+ "email": (str, Sequence),
+ "email_on_retry": bool,
+ "email_on_failure": bool,
+ "retries": int,
+ "retry_exponential_backoff": bool,
+ "depends_on_past": bool,
+ "ignore_first_depends_on_past": bool,
+ "wait_for_past_depends_before_skipping": bool,
+ "wait_for_downstream": bool,
+ "priority_weight": int,
+ "queue": str,
+ "pool": str,
+ "pool_slots": int,
+ "trigger_rule": str,
+ "run_as_user": str,
+ "task_concurrency": int,
+ "map_index_template": str,
+ "max_active_tis_per_dag": int,
+ "max_active_tis_per_dagrun": int,
+ "executor": str,
+ "do_xcom_push": bool,
+ "multiple_outputs": bool,
+ "doc": str,
+ "doc_md": str,
+ "doc_json": str,
+ "doc_yaml": str,
+ "doc_rst": str,
+ "task_display_name": str,
+ "logger_name": str,
+ "allow_nested_operators": bool,
+ "start_date": datetime,
+ "end_date": datetime,
+}
+
+
+# Note: BaseOperator is defined as a dataclass, and not an attrs class as we
do too much metaprogramming in
+# here (metaclass, custom `__setattr__` behaviour) and this fights with attrs
too much to make it worth it.
+#
+# To future reader: if you want to try and make this a "normal" attrs class,
go ahead and attempt it. If you
+# get no where leave your record here for the next poor soul and what problems
you ran in to.
+#
+# @ashb, 2024/10/14
+# - "Can't combine custom __setattr__ with on_setattr hooks"
+# - Setting class-wide `define(on_setarrs=...)` isn't called for non-attrs
subclasses
+@total_ordering
+@dataclass(repr=False, kw_only=True)
+class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
+ r"""
+ Abstract base class for all operators.
+
+ Since operators create objects that become nodes in the DAG, BaseOperator
+ contains many recursive methods for DAG crawling behavior. To derive from
+ this class, you are expected to override the constructor and the 'execute'
+ method.
+
+ Operators derived from this class should perform or trigger certain tasks
+ synchronously (wait for completion). Example of operators could be an
+ operator that runs a Pig job (PigOperator), a sensor operator that
+ waits for a partition to land in Hive (HiveSensorOperator), or one that
+ moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these
+ operators (tasks) target specific operations, running specific scripts,
+ functions or data transfers.
+
+ This class is abstract and shouldn't be instantiated. Instantiating a
+ class derived from this one results in the creation of a task object,
+ which ultimately becomes a node in DAG objects. Task dependencies should
+ be set by using the set_upstream and/or set_downstream methods.
+
+ :param task_id: a unique, meaningful id for the task
+ :param owner: the owner of the task. Using a meaningful description
+ (e.g. user/person/team/role name) to clarify ownership is recommended.
+ :param email: the 'to' email address(es) used in email alerts. This can be
a
+ single email or multiple ones. Multiple addresses can be specified as a
+ comma or semicolon separated string or by passing a list of strings.
+ :param email_on_retry: Indicates whether email alerts should be sent when a
+ task is retried
+ :param email_on_failure: Indicates whether email alerts should be sent when
+ a task failed
+ :param retries: the number of retries that should be performed before
+ failing the task
+ :param retry_delay: delay between retries, can be set as ``timedelta`` or
+ ``float`` seconds, which will be converted into ``timedelta``,
+ the default is ``timedelta(seconds=300)``.
+ :param retry_exponential_backoff: allow progressively longer waits between
+ retries by using exponential backoff algorithm on retry delay (delay
+ will be converted into seconds)
+ :param max_retry_delay: maximum delay interval between retries, can be set
as
+ ``timedelta`` or ``float`` seconds, which will be converted into
``timedelta``.
+ :param start_date: The ``start_date`` for the task, determines
+ the ``execution_date`` for the first task instance. The best practice
+ is to have the start_date rounded
+ to your DAG's ``schedule_interval``. Daily jobs have their start_date
+ some day at 00:00:00, hourly jobs have their start_date at 00:00
+ of a specific hour. Note that Airflow simply looks at the latest
+ ``execution_date`` and adds the ``schedule_interval`` to determine
+ the next ``execution_date``. It is also very important
+ to note that different tasks' dependencies
+ need to line up in time. If task A depends on task B and their
+ start_date are offset in a way that their execution_date don't line
+ up, A's dependencies will never be met. If you are looking to delay
+ a task, for example running a daily task at 2AM, look into the
+ ``TimeSensor`` and ``TimeDeltaSensor``. We advise against using
+ dynamic ``start_date`` and recommend using fixed ones. Read the
+ FAQ entry about start_date for more information.
+ :param end_date: if specified, the scheduler won't go beyond this date
+ :param depends_on_past: when set to true, task instances will run
+ sequentially and only if the previous instance has succeeded or has
been skipped.
+ The task instance for the start_date is allowed to run.
+ :param wait_for_past_depends_before_skipping: when set to true, if the
task instance
+ should be marked as skipped, and depends_on_past is true, the ti will
stay on None state
+ waiting the task of the previous run
+ :param wait_for_downstream: when set to true, an instance of task
+ X will wait for tasks immediately downstream of the previous instance
+ of task X to finish successfully or be skipped before it runs. This is
useful if the
+ different instances of a task X alter the same asset, and this asset
+ is used by tasks downstream of task X. Note that depends_on_past
+ is forced to True wherever wait_for_downstream is used. Also note that
+ only tasks *immediately* downstream of the previous task instance are
waited
+ for; the statuses of any tasks further downstream are ignored.
+ :param dag: a reference to the dag the task is attached to (if any)
+ :param priority_weight: priority weight of this task against other task.
+ This allows the executor to trigger higher priority tasks before
+ others when things get backed up. Set priority_weight as a higher
+ number for more important tasks.
+ :param weight_rule: weighting method used for the effective total
+ priority weight of the task. Options are:
+ ``{ downstream | upstream | absolute }`` default is ``downstream``
+ When set to ``downstream`` the effective weight of the task is the
+ aggregate sum of all downstream descendants. As a result, upstream
+ tasks will have higher weight and will be scheduled more aggressively
+ when using positive weight values. This is useful when you have
+ multiple dag run instances and desire to have all upstream tasks to
+ complete for all runs before each dag can continue processing
+ downstream tasks. When set to ``upstream`` the effective weight is the
+ aggregate sum of all upstream ancestors. This is the opposite where
+ downstream tasks have higher weight and will be scheduled more
+ aggressively when using positive weight values. This is useful when you
+ have multiple dag run instances and prefer to have each dag complete
+ before starting upstream tasks of other dags. When set to
+ ``absolute``, the effective weight is the exact ``priority_weight``
+ specified without additional weighting. You may want to do this when
+ you know exactly what priority weight each task should have.
+ Additionally, when set to ``absolute``, there is bonus effect of
+ significantly speeding up the task creation process as for very large
+ DAGs. Options can be set as string or using the constants defined in
+ the static class ``airflow.utils.WeightRule``
+ |experimental|
+ Since 2.9.0, Airflow allows to define custom priority weight strategy,
+ by creating a subclass of
+ ``airflow.task.priority_strategy.PriorityWeightStrategy`` and
registering
+ in a plugin, then providing the class path or the class instance via
+ ``weight_rule`` parameter. The custom priority weight strategy will be
+ used to calculate the effective total priority weight of the task
instance.
+ :param queue: which queue to target when running this job. Not
+ all executors implement queue management, the CeleryExecutor
+ does support targeting specific queues.
+ :param pool: the slot pool this task should run in, slot pools are a
+ way to limit concurrency for certain tasks
+ :param pool_slots: the number of pool slots this task should use (>= 1)
+ Values less than 1 are not allowed.
+ :param sla: time by which the job is expected to succeed. Note that
+ this represents the ``timedelta`` after the period is closed. For
+ example if you set an SLA of 1 hour, the scheduler would send an email
+ soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
+ has not succeeded yet.
+ The scheduler pays special attention for jobs with an SLA and
+ sends alert
+ emails for SLA misses. SLA misses are also recorded in the database
+ for future reference. All tasks that share the same SLA time
+ get bundled in a single email, sent soon after that time. SLA
+ notification are sent once and only once for each task instance.
+ :param execution_timeout: max time allowed for the execution of
+ this task instance, if it goes beyond it will raise and fail.
+ :param on_failure_callback: a function or list of functions to be called
when a task instance
+ of this task fails. a context dictionary is passed as a single
+ parameter to this function. Context contains references to related
+ objects to the task instance and is documented under the macros
+ section of the API.
+ :param on_execute_callback: much like the ``on_failure_callback`` except
+ that it is executed right before the task is executed.
+ :param on_retry_callback: much like the ``on_failure_callback`` except
+ that it is executed when retries occur.
+ :param on_success_callback: much like the ``on_failure_callback`` except
+ that it is executed when the task succeeds.
+ :param on_skipped_callback: much like the ``on_failure_callback`` except
+ that it is executed when skipped occur; this callback will be called
only if AirflowSkipException get raised.
+ Explicitly it is NOT called if a task is not started to be executed
because of a preceding branching
+ decision in the DAG or a trigger rule which causes execution to skip
so that the task execution
+ is never scheduled.
+ :param pre_execute: a function to be called immediately before task
+ execution, receiving a context dictionary; raising an exception will
+ prevent the task from being executed.
+
+ |experimental|
+ :param post_execute: a function to be called immediately after task
+ execution, receiving a context dictionary and task result; raising an
+ exception will prevent the task from succeeding.
+
+ |experimental|
+ :param trigger_rule: defines the rule by which dependencies are applied
+ for the task to get triggered. Options are:
+ ``{ all_success | all_failed | all_done | all_skipped | one_success |
one_done |
+ one_failed | none_failed | none_failed_min_one_success | none_skipped
| always}``
+ default is ``all_success``. Options can be set as string or
+ using the constants defined in the static class
+ ``airflow.utils.TriggerRule``
+ :param resources: A map of resource parameter names (the argument names of
the
+ Resources constructor) to their values.
+ :param run_as_user: unix username to impersonate while running the task
+ :param max_active_tis_per_dag: When set, a task will be able to limit the
concurrent
+ runs across execution_dates.
+ :param max_active_tis_per_dagrun: When set, a task will be able to limit
the concurrent
+ task instances per DAG run.
+ :param executor: Which executor to target when running this task. NOT YET
SUPPORTED
+ :param executor_config: Additional task-level configuration parameters
that are
+ interpreted by a specific executor. Parameters are namespaced by the
name of
+ executor.
+
+ **Example**: to run this task in a specific docker container through
+ the KubernetesExecutor ::
+
+ MyOperator(..., executor_config={"KubernetesExecutor": {"image":
"myCustomDockerImage"}})
+
+ :param do_xcom_push: if True, an XCom is pushed containing the Operator's
+ result
+ :param multiple_outputs: if True and do_xcom_push is True, pushes multiple
XComs, one for each
+ key in the returned dictionary result. If False and do_xcom_push is
True, pushes a single XCom.
+ :param task_group: The TaskGroup to which the task should belong. This is
typically provided when not
+ using a TaskGroup as a context manager.
+ :param doc: Add documentation or notes to your Task objects that is
visible in
+ Task Instance details View in the Webserver
+ :param doc_md: Add documentation (in Markdown format) or notes to your
Task objects
+ that is visible in Task Instance details View in the Webserver
+ :param doc_rst: Add documentation (in RST format) or notes to your Task
objects
+ that is visible in Task Instance details View in the Webserver
+ :param doc_json: Add documentation (in JSON format) or notes to your Task
objects
+ that is visible in Task Instance details View in the Webserver
+ :param doc_yaml: Add documentation (in YAML format) or notes to your Task
objects
+ that is visible in Task Instance details View in the Webserver
+ :param task_display_name: The display name of the task which appears on
the UI.
+ :param logger_name: Name of the logger used by the Operator to emit logs.
+ If set to `None` (default), the logger name will fall back to
+ `airflow.task.operators.{class.__module__}.{class.__name__}` (e.g.
SimpleHttpOperator will have
+
*airflow.task.operators.airflow.providers.http.operators.http.SimpleHttpOperator*
as logger).
+ :param allow_nested_operators: if True, when an operator is executed
within another one a warning message
+ will be logged. If False, then an exception will be raised if the
operator is badly used (e.g. nested
+ within another one). In future releases of Airflow this parameter will
be removed and an exception
+ will always be thrown when operators are nested within each other
(default is True).
+
+ **Example**: example of a bad operator mixin usage::
+
+ @task(provide_context=True)
+ def say_hello_world(**context):
+ hello_world_task = BashOperator(
+ task_id="hello_world_task",
+ bash_command="python -c \"print('Hello, world!')\"",
+ dag=dag,
+ )
+ hello_world_task.execute(context)
+ """
+
+ task_id: str
+ owner: str = DEFAULT_OWNER
+ email: str | Sequence[str] | None = None
+ email_on_retry: bool = True
+ email_on_failure: bool = True
+ retries: int | None = DEFAULT_RETRIES
+ retry_delay: timedelta | float = DEFAULT_RETRY_DELAY
+ retry_exponential_backoff: bool = False
+ max_retry_delay: timedelta | float | None = None
+ start_date: datetime | None = None
+ end_date: datetime | None = None
+ depends_on_past: bool = False
+ ignore_first_depends_on_past: bool = DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST
+ wait_for_past_depends_before_skipping: bool =
DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING
+ wait_for_downstream: bool = False
+ dag: DAG | None = None
+ params: collections.abc.MutableMapping | None = None
+ default_args: dict | None = None
+ priority_weight: int = DEFAULT_PRIORITY_WEIGHT
+ # TODO:
+ weight_rule: PriorityWeightStrategy | str = DEFAULT_WEIGHT_RULE
+ queue: str = DEFAULT_QUEUE
+ pool: str = "default"
+ pool_slots: int = DEFAULT_POOL_SLOTS
+ execution_timeout: timedelta | None = DEFAULT_TASK_EXECUTION_TIMEOUT
+ # on_execute_callback: None | TaskStateChangeCallback |
list[TaskStateChangeCallback] = None
+ # on_failure_callback: None | TaskStateChangeCallback |
list[TaskStateChangeCallback] = None
+ # on_success_callback: None | TaskStateChangeCallback |
list[TaskStateChangeCallback] = None
+ # on_retry_callback: None | TaskStateChangeCallback |
list[TaskStateChangeCallback] = None
+ # on_skipped_callback: None | TaskStateChangeCallback |
list[TaskStateChangeCallback] = None
+ # pre_execute: TaskPreExecuteHook | None = None
+ # post_execute: TaskPostExecuteHook | None = None
Review Comment:
This are still experimental -- wonder if we should just kill it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]