This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aa36f045d73 Move determine_kwargs and KeywordParameters to SDK
DecoratedOperator (#62746)
aa36f045d73 is described below
commit aa36f045d732e392dd7f43b1606855649044cf55
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Mar 3 02:32:12 2026 +0000
Move determine_kwargs and KeywordParameters to SDK DecoratedOperator
(#62746)
Move `KeywordParameters` class and `determine_kwargs()` from
`airflow.utils.operator_helpers` to
`task-sdk/src/airflow/sdk/bases/decorator.py`,
next to `DecoratedOperator` which owns `python_callable` and `op_args`.
Add a `determine_kwargs()` instance method on `DecoratedOperator` (mirrors
the existing pattern on `PythonOperator`).
All 6 decorated operators now import the standalone `determine_kwargs` from
`common.compat.sdk` instead of `airflow.utils.operator_helpers`. Using the
compat layer (rather than the instance method) handles the case where a
newer
provider runs against an older Task SDK whose `DecoratedOperator` lacks the
method. The compat fallback resolves to `airflow.utils.operator_helpers` on
older Airflow versions.
Other changes:
- `operator_helpers.py` re-exports via `add_deprecated_classes` so existing
callers get a deprecation warning pointing to the SDK location
- `PythonOperator` imports `KeywordParameters` from compat SDK
- Affected provider pyproject.toml files marked with `# use next version`
for the common-compat dependency
- Cleaned up stale `serializing()` reference in `KeywordParameters`
docstring
- Removed redundant local `import itertools` (already at module level)
---
airflow-core/src/airflow/utils/operator_helpers.py | 89 +++++-----------------
providers/cncf/kubernetes/pyproject.toml | 2 +-
.../cncf/kubernetes/decorators/kubernetes_cmd.py | 2 +-
.../airflow/providers/common/ai/decorators/llm.py | 2 +-
.../providers/common/ai/decorators/llm_sql.py | 2 +-
.../src/airflow/providers/common/compat/sdk.py | 4 +
providers/common/sql/pyproject.toml | 2 +-
.../providers/common/sql/decorators/analytics.py | 2 +-
.../airflow/providers/common/sql/decorators/sql.py | 2 +-
providers/standard/pyproject.toml | 2 +-
.../airflow/providers/standard/decorators/bash.py | 2 +-
.../airflow/providers/standard/operators/python.py | 8 +-
task-sdk/src/airflow/sdk/bases/decorator.py | 64 ++++++++++++++++
13 files changed, 101 insertions(+), 82 deletions(-)
diff --git a/airflow-core/src/airflow/utils/operator_helpers.py
b/airflow-core/src/airflow/utils/operator_helpers.py
index 8b8d339b157..5de5a74eda6 100644
--- a/airflow-core/src/airflow/utils/operator_helpers.py
+++ b/airflow-core/src/airflow/utils/operator_helpers.py
@@ -17,82 +17,27 @@
# under the License.
from __future__ import annotations
-import inspect
-from collections.abc import Callable, Collection, Mapping
-from typing import Any, TypeVar
+from collections.abc import Callable
+from typing import TYPE_CHECKING, TypeVar
-R = TypeVar("R")
-
-
-class KeywordParameters:
- """
- Wrapper representing ``**kwargs`` to a callable.
-
- The actual ``kwargs`` can be obtained by calling either ``unpacking()`` or
- ``serializing()``. They behave almost the same and are only different if
- the containing ``kwargs`` is an Airflow Context object, and the calling
- function uses ``**kwargs`` in the argument list.
-
- In this particular case, ``unpacking()`` uses ``lazy-object-proxy`` to
- prevent the Context from emitting deprecation warnings too eagerly when
it's
- unpacked by ``**``. ``serializing()`` does not do this, and will allow the
- warnings to be emitted eagerly, which is useful when you want to dump the
- content and use it somewhere else without needing ``lazy-object-proxy``.
- """
-
- def __init__(self, kwargs: Mapping[str, Any]) -> None:
- self._kwargs = kwargs
-
- @classmethod
- def determine(
- cls,
- func: Callable[..., Any],
- args: Collection[Any],
- kwargs: Mapping[str, Any],
- ) -> KeywordParameters:
- import itertools
+from airflow.utils.deprecation_tools import add_deprecated_classes
- signature = inspect.signature(func)
- has_wildcard_kwargs = any(p.kind == p.VAR_KEYWORD for p in
signature.parameters.values())
+if TYPE_CHECKING:
+ from airflow.sdk.bases.decorator import KeywordParameters,
determine_kwargs # noqa: F401
- for name, param in itertools.islice(signature.parameters.items(),
len(args)):
- # Keyword-only arguments can't be passed positionally and are not
checked.
- if param.kind == inspect.Parameter.KEYWORD_ONLY:
- continue
- if param.kind == inspect.Parameter.VAR_KEYWORD:
- continue
+__all__ = ["make_kwargs_callable"]
- # Check if args conflict with names in kwargs.
- if name in kwargs:
- raise ValueError(f"The key {name!r} in args is a part of
kwargs and therefore reserved.")
-
- if has_wildcard_kwargs:
- # If the callable has a **kwargs argument, it's ready to accept
all the kwargs.
- return cls(kwargs)
-
- # If the callable has no **kwargs argument, it only wants the
arguments it requested.
- filtered_kwargs = {key: kwargs[key] for key in signature.parameters if
key in kwargs}
- return cls(filtered_kwargs)
-
- def unpacking(self) -> Mapping[str, Any]:
- """Dump the kwargs mapping to unpack with ``**`` in a function call."""
- return self._kwargs
-
-
-def determine_kwargs(
- func: Callable[..., Any],
- args: Collection[Any],
- kwargs: Mapping[str, Any],
-) -> Mapping[str, Any]:
- """
- Inspect the signature of a callable to determine which kwargs need to be
passed to the callable.
+R = TypeVar("R")
- :param func: The callable that you want to invoke
- :param args: The positional arguments that need to be passed to the
callable, so we know how many to skip.
- :param kwargs: The keyword arguments that need to be filtered before
passing to the callable.
- :return: A dictionary which contains the keyword arguments that are
compatible with the callable.
- """
- return KeywordParameters.determine(func, args, kwargs).unpacking()
+add_deprecated_classes(
+ {
+ __name__: {
+ "KeywordParameters":
"airflow.sdk.bases.decorator.KeywordParameters",
+ "determine_kwargs": "airflow.sdk.bases.decorator.determine_kwargs",
+ },
+ },
+ package=__name__,
+)
def make_kwargs_callable(func: Callable[..., R]) -> Callable[..., R]:
@@ -104,6 +49,8 @@ def make_kwargs_callable(func: Callable[..., R]) ->
Callable[..., R]:
"""
import functools
+ from airflow.sdk.bases.decorator import determine_kwargs
+
@functools.wraps(func)
def kwargs_func(*args, **kwargs):
kwargs = determine_kwargs(func, args, kwargs)
diff --git a/providers/cncf/kubernetes/pyproject.toml
b/providers/cncf/kubernetes/pyproject.toml
index bbefa2830f1..da26bb1c01a 100644
--- a/providers/cncf/kubernetes/pyproject.toml
+++ b/providers/cncf/kubernetes/pyproject.toml
@@ -60,7 +60,7 @@ requires-python = ">=3.10"
dependencies = [
"aiofiles>=23.2.0",
"apache-airflow>=2.11.0",
- "apache-airflow-providers-common-compat>=1.13.0",
+ "apache-airflow-providers-common-compat>=1.13.0", # use next version
"asgiref>=3.5.2",
# TODO(potiuk): We should bump cryptography to >=46.0.0 when
sqlalchemy>=2.0 is required
"cryptography>=41.0.0,<46.0.0",
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
index 3e710ce48e3..fb15f5097bc 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
@@ -25,9 +25,9 @@ from airflow.providers.common.compat.sdk import (
DecoratedOperator,
TaskDecorator,
context_merge,
+ determine_kwargs,
task_decorator_factory,
)
-from airflow.utils.operator_helpers import determine_kwargs
if TYPE_CHECKING:
from airflow.sdk import Context
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
index 04c194aed09..f21bcd343c0 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
@@ -33,10 +33,10 @@ from airflow.providers.common.compat.sdk import (
DecoratedOperator,
TaskDecorator,
context_merge,
+ determine_kwargs,
task_decorator_factory,
)
from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
-from airflow.utils.operator_helpers import determine_kwargs
if TYPE_CHECKING:
from airflow.sdk import Context
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_sql.py
b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_sql.py
index 25fa57b5d14..d0ebb1a9bb0 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_sql.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_sql.py
@@ -32,10 +32,10 @@ from airflow.providers.common.compat.sdk import (
DecoratedOperator,
TaskDecorator,
context_merge,
+ determine_kwargs,
task_decorator_factory,
)
from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
-from airflow.utils.operator_helpers import determine_kwargs
if TYPE_CHECKING:
from airflow.sdk import Context
diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
index e444085e0d8..bcd28c4799f 100644
--- a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
+++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
@@ -73,7 +73,9 @@ if TYPE_CHECKING:
from airflow.sdk.bases.decorator import (
DecoratedMappedOperator as DecoratedMappedOperator,
DecoratedOperator as DecoratedOperator,
+ KeywordParameters as KeywordParameters,
TaskDecorator as TaskDecorator,
+ determine_kwargs as determine_kwargs,
get_unique_task_id as get_unique_task_id,
task_decorator_factory as task_decorator_factory,
)
@@ -167,6 +169,8 @@ _IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
"TaskDecorator": ("airflow.sdk.bases.decorator", "airflow.decorators"),
"task_decorator_factory": ("airflow.sdk.bases.decorator",
"airflow.decorators.base"),
"get_unique_task_id": ("airflow.sdk.bases.decorator",
"airflow.decorators.base"),
+ "KeywordParameters": ("airflow.sdk.bases.decorator",
"airflow.utils.operator_helpers"),
+ "determine_kwargs": ("airflow.sdk.bases.decorator",
"airflow.utils.operator_helpers"),
#
============================================================================
# Models
#
============================================================================
diff --git a/providers/common/sql/pyproject.toml
b/providers/common/sql/pyproject.toml
index 0b5cd1d0f86..cd9484e49ea 100644
--- a/providers/common/sql/pyproject.toml
+++ b/providers/common/sql/pyproject.toml
@@ -59,7 +59,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with
``breeze ci-image build``
dependencies = [
"apache-airflow>=2.11.0",
- "apache-airflow-providers-common-compat>=1.12.0",
+ "apache-airflow-providers-common-compat>=1.12.0", # use next version
"sqlparse>=0.5.1",
"more-itertools>=9.0.0",
# The methodtools dependency is necessary since the introduction of
dialects:
diff --git
a/providers/common/sql/src/airflow/providers/common/sql/decorators/analytics.py
b/providers/common/sql/src/airflow/providers/common/sql/decorators/analytics.py
index 29fb66d872a..1c297ce8254 100644
---
a/providers/common/sql/src/airflow/providers/common/sql/decorators/analytics.py
+++
b/providers/common/sql/src/airflow/providers/common/sql/decorators/analytics.py
@@ -25,10 +25,10 @@ from airflow.providers.common.compat.sdk import (
DecoratedOperator,
TaskDecorator,
context_merge,
+ determine_kwargs,
task_decorator_factory,
)
from airflow.providers.common.sql.operators.analytics import AnalyticsOperator
-from airflow.utils.operator_helpers import determine_kwargs
if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
diff --git
a/providers/common/sql/src/airflow/providers/common/sql/decorators/sql.py
b/providers/common/sql/src/airflow/providers/common/sql/decorators/sql.py
index 46eac567a6c..72f9b48da7f 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/decorators/sql.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/decorators/sql.py
@@ -25,10 +25,10 @@ from airflow.providers.common.compat.sdk import (
DecoratedOperator,
TaskDecorator,
context_merge,
+ determine_kwargs,
task_decorator_factory,
)
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
-from airflow.utils.operator_helpers import determine_kwargs
if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
diff --git a/providers/standard/pyproject.toml
b/providers/standard/pyproject.toml
index 86d14eb3e4b..d6d9df1972e 100644
--- a/providers/standard/pyproject.toml
+++ b/providers/standard/pyproject.toml
@@ -59,7 +59,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with
``breeze ci-image build``
dependencies = [
"apache-airflow>=2.11.0",
- "apache-airflow-providers-common-compat>=1.13.0",
+ "apache-airflow-providers-common-compat>=1.13.0", # use next version
]
# The optional dependencies should be modified in place in the generated file
diff --git
a/providers/standard/src/airflow/providers/standard/decorators/bash.py
b/providers/standard/src/airflow/providers/standard/decorators/bash.py
index e415223fc01..169a8dea58a 100644
--- a/providers/standard/src/airflow/providers/standard/decorators/bash.py
+++ b/providers/standard/src/airflow/providers/standard/decorators/bash.py
@@ -25,11 +25,11 @@ from airflow.providers.common.compat.sdk import (
DecoratedOperator,
TaskDecorator,
context_merge,
+ determine_kwargs,
task_decorator_factory,
)
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
-from airflow.utils.operator_helpers import determine_kwargs
if TYPE_CHECKING:
from airflow.providers.common.compat.sdk import Context
diff --git
a/providers/standard/src/airflow/providers/standard/operators/python.py
b/providers/standard/src/airflow/providers/standard/operators/python.py
index 7e6bbd166a6..cba2c1f31dd 100644
--- a/providers/standard/src/airflow/providers/standard/operators/python.py
+++ b/providers/standard/src/airflow/providers/standard/operators/python.py
@@ -47,7 +47,12 @@ from airflow.exceptions import (
DeserializingResultError,
)
from airflow.models.variable import Variable
-from airflow.providers.common.compat.sdk import AirflowException,
AirflowSkipException, context_merge
+from airflow.providers.common.compat.sdk import (
+ AirflowException,
+ AirflowSkipException,
+ KeywordParameters,
+ context_merge,
+)
from airflow.providers.common.compat.standard.operators import (
BaseAsyncOperator,
is_async_callable,
@@ -61,7 +66,6 @@ from airflow.providers.standard.utils.python_virtualenv
import (
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
from airflow.utils import hashlib_wrapper
from airflow.utils.file import get_unique_dag_module_name
-from airflow.utils.operator_helpers import KeywordParameters
if AIRFLOW_V_3_0_PLUS:
from airflow.providers.standard.operators.branch import BaseBranchOperator
diff --git a/task-sdk/src/airflow/sdk/bases/decorator.py
b/task-sdk/src/airflow/sdk/bases/decorator.py
index 131750ae8c9..53565c531a6 100644
--- a/task-sdk/src/airflow/sdk/bases/decorator.py
+++ b/task-sdk/src/airflow/sdk/bases/decorator.py
@@ -192,6 +192,67 @@ def is_async_callable(func):
return False
+class KeywordParameters:
+ """
+ Wrapper representing ``**kwargs`` to a callable.
+
+ The actual ``kwargs`` can be obtained by calling ``unpacking()``, which
+ returns the mapping suitable for unpacking with ``**`` in a function call.
+ """
+
+ def __init__(self, kwargs: Mapping[str, Any]) -> None:
+ self._kwargs = kwargs
+
+ @classmethod
+ def determine(
+ cls,
+ func: Callable[..., Any],
+ args: Collection[Any],
+ kwargs: Mapping[str, Any],
+ ) -> KeywordParameters:
+ signature = inspect.signature(func)
+ has_wildcard_kwargs = any(p.kind == p.VAR_KEYWORD for p in
signature.parameters.values())
+
+ for name, param in itertools.islice(signature.parameters.items(),
len(args)):
+ # Keyword-only arguments can't be passed positionally and are not
checked.
+ if param.kind == inspect.Parameter.KEYWORD_ONLY:
+ continue
+ if param.kind == inspect.Parameter.VAR_KEYWORD:
+ continue
+
+ # Check if args conflict with names in kwargs.
+ if name in kwargs:
+ raise ValueError(f"The key {name!r} in args is a part of
kwargs and therefore reserved.")
+
+ if has_wildcard_kwargs:
+ # If the callable has a **kwargs argument, it's ready to accept
all the kwargs.
+ return cls(kwargs)
+
+ # If the callable has no **kwargs argument, it only wants the
arguments it requested.
+ filtered_kwargs = {key: kwargs[key] for key in signature.parameters if
key in kwargs}
+ return cls(filtered_kwargs)
+
+ def unpacking(self) -> Mapping[str, Any]:
+ """Dump the kwargs mapping to unpack with ``**`` in a function call."""
+ return self._kwargs
+
+
+def determine_kwargs(
+ func: Callable[..., Any],
+ args: Collection[Any],
+ kwargs: Mapping[str, Any],
+) -> Mapping[str, Any]:
+ """
+ Inspect the signature of a callable to determine which kwargs need to be
passed to the callable.
+
+ :param func: The callable that you want to invoke
+ :param args: The positional arguments that need to be passed to the
callable, so we know how many to skip.
+ :param kwargs: The keyword arguments that need to be filtered before
passing to the callable.
+ :return: A dictionary which contains the keyword arguments that are
compatible with the callable.
+ """
+ return KeywordParameters.determine(func, args, kwargs).unpacking()
+
+
class DecoratedOperator(BaseOperator):
"""
Wraps a Python callable and captures args/kwargs when called for execution.
@@ -331,6 +392,9 @@ class DecoratedOperator(BaseOperator):
kwargs["op_kwargs"] = op_kwargs
return args, kwargs
+ def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str,
Any]:
+ return KeywordParameters.determine(self.python_callable, self.op_args,
context).unpacking()
+
def get_python_source(self):
raw_source = inspect.getsource(self.python_callable)
raw_source_lines = [line for line in raw_source.splitlines() if not
line.strip().startswith("#")]