This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c6c34243b9 Order of task arguments in task definition causing error 
when parsing DAG (#62174)
5c6c34243b9 is described below

commit 5c6c34243b9a72782b1049a1bd4b64dcc13ab4ff
Author: Software Developer <[email protected]>
AuthorDate: Mon Mar 23 14:30:50 2026 +0100

    Order of task arguments in task definition causing error when parsing DAG 
(#62174)
    
    * *fix the issue when -> order of task arguments in task definition causing 
error when parsing DAG
    
    * *add more related unit tests.
    
    * *apply PR remarks. unify a bit of naming.
    
    * *reorganize tests.
    
    * *fix for tests.
    
    * *address PR remarks.
    
    * *address PR remarks.
    
    * *address PR remarks.
    
    * *address PR remarks.
    
    * *address PR remarks.
    
    * *address PR remarks.
---
 task-sdk/src/airflow/sdk/bases/decorator.py     |  38 ++++++
 task-sdk/tests/task_sdk/bases/test_decorator.py | 147 +++++++++++++++++++++++-
 2 files changed, 184 insertions(+), 1 deletion(-)

diff --git a/task-sdk/src/airflow/sdk/bases/decorator.py 
b/task-sdk/src/airflow/sdk/bases/decorator.py
index 28119ced429..2e98f826c7e 100644
--- a/task-sdk/src/airflow/sdk/bases/decorator.py
+++ b/task-sdk/src/airflow/sdk/bases/decorator.py
@@ -313,6 +313,33 @@ class DecoratedOperator(BaseOperator):
             param.replace(default=None) if param.name in KNOWN_CONTEXT_KEYS 
else param
             for param in signature.parameters.values()
         ]
+
+        # Python requires that positional parameters with defaults don't 
precede those without.
+        # This only applies to POSITIONAL_ONLY and POSITIONAL_OR_KEYWORD 
parameters — *args,
+        # **kwargs, and keyword-only parameters follow different rules.
+        positional_kinds = (inspect.Parameter.POSITIONAL_ONLY, 
inspect.Parameter.POSITIONAL_OR_KEYWORD)
+        positional = [(i, p) for i, p in enumerate(parameters) if p.kind in 
positional_kinds]
+        first_default_idx = next((i for i, p in positional if p.default != 
inspect.Parameter.empty), None)
+
+        # Names of non-context-key params that receive an injected None 
default purely to satisfy
+        # Python's ordering constraint. These params are still semantically 
required and must be
+        # explicitly provided via op_args/op_kwargs — we verify this below 
after bind().
+        injected_for_ordering: set[str] = set()
+        if first_default_idx is not None:
+            new_parameters = []
+            for i, param in enumerate(parameters):
+                if (
+                    i > first_default_idx
+                    and param.kind in positional_kinds
+                    and param.default == inspect.Parameter.empty
+                ):
+                    new_parameters.append(param.replace(default=None))
+                    if param.name not in KNOWN_CONTEXT_KEYS:
+                        injected_for_ordering.add(param.name)
+                else:
+                    new_parameters.append(param)
+            parameters = new_parameters
+
         try:
             signature = signature.replace(parameters=parameters)
         except ValueError as err:
@@ -342,6 +369,17 @@ class DecoratedOperator(BaseOperator):
         else:
             signature.bind(*op_args, **op_kwargs)
 
+        # Params in injected_for_ordering are semantically required even 
though they received a
+        # None default to satisfy Python's ordering constraint. Verify they 
are actually provided.
+        if injected_for_ordering and not 
kwargs.get("_airflow_mapped_validation_only"):
+            positional_param_names = [p.name for p in parameters if p.kind in 
positional_kinds]
+            covered_by_pos = set(positional_param_names[: len(op_args)])
+            provided = covered_by_pos | set(op_kwargs.keys())
+            missing = injected_for_ordering - provided
+            if missing:
+                missing_str = ", ".join(sorted(missing))
+                raise TypeError(f"missing required argument(s): {missing_str}")
+
         self.op_args = op_args
         self.op_kwargs = op_kwargs
         super().__init__(task_id=task_id, **kwargs_to_upstream, **kwargs)
diff --git a/task-sdk/tests/task_sdk/bases/test_decorator.py 
b/task-sdk/tests/task_sdk/bases/test_decorator.py
index 913e4c6e5f0..6021f6f0aad 100644
--- a/task-sdk/tests/task_sdk/bases/test_decorator.py
+++ b/task-sdk/tests/task_sdk/bases/test_decorator.py
@@ -25,7 +25,7 @@ from pathlib import Path
 import pytest
 
 from airflow.sdk import task
-from airflow.sdk.bases.decorator import DecoratedOperator, is_async_callable
+from airflow.sdk.bases.decorator import KNOWN_CONTEXT_KEYS, DecoratedOperator, 
is_async_callable
 
 RAW_CODE = """
 from airflow.sdk import task
@@ -69,6 +69,151 @@ class TestBaseDecorator:
         assert cleaned.lstrip().splitlines()[0].startswith("def a_task")
 
 
+class DummyDecoratedOperator(DecoratedOperator):
+    custom_operator_name = "@task.dummy"
+
+    def execute(self, context):
+        return self.python_callable(*self.op_args, **self.op_kwargs)
+
+
+class TestDefaultFillingLogic:
+    @pytest.mark.parametrize(
+        ("func", "kwargs", "args"),
+        [
+            pytest.param(
+                lambda: 42,
+                {},
+                [],
+                id="no_params",
+            ),
+            pytest.param(
+                lambda x, y=99: (x, y),
+                {"x": 1},
+                [],
+                id="param_after_first_default_is_given_none",
+            ),
+            pytest.param(
+                lambda a, b=5, c=None: (a, b, c),
+                {"a": 1},
+                [],
+                id="all_params_after_first_default_already_have_defaults",
+            ),
+            pytest.param(
+                lambda a, b, c=99: (a, b, c),
+                {},
+                [1, 2],
+                id="single_trailing_optional",
+            ),
+        ],
+    )
+    def test_construction_succeeds(self, func, kwargs, args):
+        op = make_op(func, op_kwargs=kwargs, op_args=args)
+        assert op is not None
+
+    def test_construction_succeeds_with_context_key_params(self):
+        def foo(ds, my_data):
+            return my_data
+
+        assert make_op(foo, op_args=[None, None]) is not None
+
+    def test_context_key_default_none_does_not_raise(self):
+        ctx_key = next(iter(sorted(KNOWN_CONTEXT_KEYS)))
+        f = _make_func(f"def dummy_task(x, {ctx_key}=None): return x")
+        assert make_op(f, op_kwargs={"x": 1}) is not None
+
+    def test_context_key_with_non_none_default_raises(self):
+        ctx_key = next(iter(sorted(KNOWN_CONTEXT_KEYS)))
+        f = _make_func(f"def dummy_task(x, {ctx_key}='bad_default'): return x")
+        with pytest.raises(ValueError, match="can't have a default other than 
None"):
+            make_op(f, op_kwargs={"x": 1})
+
+    @pytest.mark.parametrize(
+        ("func_src", "op_kwargs"),
+        [
+            pytest.param(
+                "def dummy_task({ctx0}, x, y=10): return (x, y)",
+                {"x": 1},
+                id="context_key_before_first_default_shifts_boundary",
+            ),
+            pytest.param(
+                "def dummy_task(x, y=5, {ctx0}=None): return (x, y)",
+                {"x": 1},
+                id="context_key_after_regular_default",
+            ),
+            pytest.param(
+                "def dummy_task(a, {ctx0}=None, b=7, {ctx1}=None): return (a, 
b)",
+                {"a": 1},
+                id="multiple_context_keys_mixed_with_regular_defaults",
+            ),
+            pytest.param(
+                "def dummy_task({ctx0}, x, y=10): return (x, y)",
+                {"x": 42},
+                
id="required_param_between_context_key_and_regular_default_gets_none",
+            ),
+            pytest.param(
+                "def dummy_task({ctx0}=None, {ctx1}=None, {ctx2}=None): return 
True",
+                {},
+                id="context_key_only_signature",
+            ),
+        ],
+    )
+    def test_context_key_construction_succeeds(self, func_src, op_kwargs):
+        """All context-key signature shapes must construct without raising."""
+
+        ctx_keys = sorted(KNOWN_CONTEXT_KEYS)
+        src = func_src.format(
+            ctx0=ctx_keys[0],
+            ctx1=ctx_keys[1] if len(ctx_keys) > 1 else ctx_keys[0],
+            ctx2=ctx_keys[2] if len(ctx_keys) > 2 else ctx_keys[0],
+        )
+        op = make_op(_make_func(src), op_kwargs=op_kwargs)
+        assert op is not None
+
+    def test_non_context_param_after_context_key_gets_none_injected(self):
+        ctx_key = next(iter(sorted(KNOWN_CONTEXT_KEYS)))
+        f = _make_func(f"def dummy_task({ctx_key}, a): ...")
+        assert make_op(f, op_kwargs={"a": "2024-01-01"}) is not None
+
+    def test_non_context_param_after_context_key_without_value_raises(self):
+        ctx_key = next(iter(sorted(KNOWN_CONTEXT_KEYS)))
+        f = _make_func(f"def dummy_task({ctx_key}, a): ...")
+        with pytest.raises(TypeError, match="missing required argument"):
+            make_op(f)
+
+    def test_bind_validation_fails_for_missing_required_args(self):
+        """Truly required args with no supplied value must still cause a bind 
failure."""
+
+        def dummy_task(required_arg):
+            return required_arg
+
+        with pytest.raises(TypeError):
+            make_op(dummy_task)
+
+    def test_variadic_and_keyword_only_params_are_not_assigned_defaults(self):
+        """Construction succeeds when variadic and keyword-only params are 
present."""
+
+        def dummy_task(a, b=1, *args, kw_required, **kwargs):
+            return a, b, args, kw_required, kwargs
+
+        assert make_op(dummy_task, op_kwargs={"a": 1, "kw_required": "x"}) is 
not None
+
+
+def make_op(func, op_args=None, op_kwargs=None, **kwargs):
+    return DummyDecoratedOperator(
+        task_id="test_task",
+        python_callable=func,
+        op_args=op_args or [],
+        op_kwargs=op_kwargs or {},
+        **kwargs,
+    )
+
+
+def _make_func(src: str):
+    ns: dict = {}
+    exec(src, ns)
+    return next(v for v in ns.values() if callable(v))
+
+
 def simple_decorator(fn):
     @functools.wraps(fn)
     def wrapper(*args, **kwargs):

Reply via email to