This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a6ea6f9a082 Validate map_index range in CLI commands (#62626)
a6ea6f9a082 is described below

commit a6ea6f9a0827126ef8968985f56c221385491fad
Author: leon.jeon <[email protected]>
AuthorDate: Wed Mar 11 12:41:10 2026 +0000

    Validate map_index range in CLI commands (#62626)
    
    * Validate map_index range in CLI commands
    
    When a user passes a map_index to CLI commands like `tasks render` or
    `tasks test`, validate that it falls within the parse-time mapped count.
    This gives a clear error message instead of silently proceeding with an
    invalid index. For dynamically mapped tasks (XCom-based), the validation
    is skipped since the count is not available at parse time.
    
    closes: #60463
    
    Co-Authored-By: claude-flow <[email protected]>
    
    * Raise clear error for map_index on non-mapped tasks
    
    Split the except block so NotMapped raises a ValueError instead of
    being silently ignored alongside NotFullyPopulated. Also switch the
    dynamic-mapping test to the standard pytest.raises pattern.
    
    Co-Authored-By: claude-flow <[email protected]>
    
    * Fix ruff format: collapse single-line raise ValueError
    
    Co-Authored-By: claude-flow <[email protected]>
    
    * Use AirflowConsole for non-JSON pool export output
    
    Replace `rich.print(pools_list)` with `AirflowConsole().print_as()` so
    non-JSON export formats (table, yaml, plain) are rendered consistently
    with other airflow-ctl commands. Add test verifying the delegation.
    
    Co-Authored-By: claude-flow <[email protected]>
    
    * Revert "Use AirflowConsole for non-JSON pool export output"
    
    This reverts commit 5dc87ee8a66edaa09b23faf59ee7c42510e483c1.
    
    ---------
    
    Co-authored-by: claude-flow <[email protected]>
---
 .../src/airflow/cli/commands/task_command.py       | 16 +++++-
 .../tests/unit/cli/commands/test_task_command.py   | 62 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/cli/commands/task_command.py 
b/airflow-core/src/airflow/cli/commands/task_command.py
index 3cdd5c9f619..e615d63ccf4 100644
--- a/airflow-core/src/airflow/cli/commands/task_command.py
+++ b/airflow-core/src/airflow/cli/commands/task_command.py
@@ -31,10 +31,11 @@ from airflow import settings
 from airflow._shared.timezones import timezone
 from airflow.cli.simple_table import AirflowConsole
 from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
-from airflow.exceptions import AirflowConfigException, DagRunNotFound, 
TaskInstanceNotFound
+from airflow.exceptions import AirflowConfigException, DagRunNotFound, 
NotMapped, TaskInstanceNotFound
 from airflow.models import TaskInstance
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagrun import DagRun, get_or_create_dagrun
+from airflow.models.expandinput import NotFullyPopulated
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.sdk.definitions.dag import DAG, _run_task
 from airflow.sdk.definitions.param import ParamsDict
@@ -203,7 +204,18 @@ def _get_ti(
                 f"TaskInstance for {dag.dag_id}, {task.task_id}, 
map={map_index} with "
                 f"run_id or logical_date of {logical_date_or_run_id!r} not 
found"
             )
-        # TODO: Validate map_index is in range?
+        if map_index >= 0:
+            try:
+                total = task.get_parse_time_mapped_ti_count()
+                if map_index >= total:
+                    raise ValueError(
+                        f"map_index {map_index} is out of range. "
+                        f"Task '{task.task_id}' has {total} mapped instance(s) 
[0..{total - 1}]."
+                    )
+            except NotFullyPopulated:
+                pass  # Dynamic mapping — cannot validate at parse time
+            except NotMapped:
+                raise ValueError(f"Task '{task.task_id}' is not mapped; 
map_index must be -1.")
         dag_version = DagVersion.get_latest_version(dag.dag_id, 
session=session)
         if not dag_version:
             # TODO: Remove this once DagVersion.get_latest_version is 
guaranteed to return a DagVersion/raise
diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py 
b/airflow-core/tests/unit/cli/commands/test_task_command.py
index 7ba2f3cc4bf..8d96d5579ec 100644
--- a/airflow-core/tests/unit/cli/commands/test_task_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_task_command.py
@@ -310,6 +310,68 @@ class TestCliTasks:
         assert "[3]" not in output
         assert "property: op_args" in output
 
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_mapped_task_render_out_of_range_map_index(self):
+        """Raise ValueError when map_index exceeds the parse-time mapped 
count."""
+        with pytest.raises(ValueError, match=r"map_index 5 is out of range.*3 
mapped instance"):
+            task_command.task_render(
+                self.parser.parse_args(
+                    [
+                        "tasks",
+                        "render",
+                        "test_mapped_classic",
+                        "consumer_literal",
+                        "2022-01-01",
+                        "--map-index",
+                        "5",
+                    ]
+                )
+            )
+
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_mapped_task_render_boundary_map_index(self):
+        """Render should succeed for the last valid map_index (count - 1)."""
+        with redirect_stdout(io.StringIO()) as stdout:
+            task_command.task_render(
+                self.parser.parse_args(
+                    [
+                        "tasks",
+                        "render",
+                        "test_mapped_classic",
+                        "consumer_literal",
+                        "2022-01-01",
+                        "--map-index",
+                        "2",
+                    ]
+                )
+            )
+        output = stdout.getvalue()
+        assert "[3]" in output
+        assert "property: op_args" in output
+
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_mapped_task_render_dynamic_skips_validation(self):
+        """Dynamic (XCom-based) mapping should skip map_index validation."""
+        # consumer depends on XCom from make_arg_lists, so parse-time count
+        # is not available. Validation should be skipped (NotFullyPopulated).
+        # The render may fail for other reasons, but not with our
+        # "out of range" ValueError.
+        with pytest.raises(Exception) as exc_info:  # noqa: PT011
+            task_command.task_render(
+                self.parser.parse_args(
+                    [
+                        "tasks",
+                        "render",
+                        "test_mapped_classic",
+                        "consumer",
+                        "2022-01-01",
+                        "--map-index",
+                        "999",
+                    ]
+                )
+            )
+        assert "out of range" not in str(exc_info.value)
+
     def test_mapped_task_render_with_template(self, dag_maker):
         """
         tasks render should render and displays templated fields for a given 
mapping task

Reply via email to