This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b3fa498937 Cadwyn migration for backcompat of `/run` endpoint (#50528)
2b3fa498937 is described below

commit 2b3fa4989371996e1b2867be13efce89942d7e5d
Author: Amogh Desai <amoghrajesh1...@gmail.com>
AuthorDate: Wed May 14 18:31:35 2025 +0530

    Cadwyn migration for backcompat of `/run` endpoint (#50528)
---
 .../api_fastapi/execution_api/versions/__init__.py |   2 +
 .../execution_api/versions/v2025_05_20.py          |  54 ++++++++
 .../versions/v2025_04_28/test_task_instances.py    | 142 +++++++++++++++++++++
 .../src/airflow/sdk/api/datamodels/_generated.py   |   2 +-
 4 files changed, 199 insertions(+), 1 deletion(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index 54329054c12..5462f102974 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -20,9 +20,11 @@ from __future__ import annotations
 from cadwyn import HeadVersion, Version, VersionBundle
 
 from airflow.api_fastapi.execution_api.versions.v2025_04_28 import 
AddRenderedMapIndexField
+from airflow.api_fastapi.execution_api.versions.v2025_05_20 import 
DowngradeUpstreamMapIndexes
 
 bundle = VersionBundle(
     HeadVersion(),
+    Version("2025-05-20", DowngradeUpstreamMapIndexes),
     Version("2025-04-28", AddRenderedMapIndexField),
     Version("2025-04-11"),
 )
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py
new file mode 100644
index 00000000000..f8039f0a4d3
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Optional
+
+from cadwyn import ResponseInfo, VersionChange, 
convert_response_to_previous_version_for, schema
+
+from airflow.api_fastapi.execution_api.datamodels.taskinstance import 
TIRunContext
+
+
+class DowngradeUpstreamMapIndexes(VersionChange):
+    """Downgrade the upstream map indexes type for older clients."""
+
+    description = __doc__
+
+    instructions_to_migrate_to_previous_version = (
+        
schema(TIRunContext).field("upstream_map_indexes").had(type=Optional[dict[str, 
int]]),  # type: ignore
+    )
+
+    @convert_response_to_previous_version_for(TIRunContext)  # type: 
ignore[arg-type]
+    def downgrade_upstream_map_indexes(response: ResponseInfo = None) -> None: 
 # type: ignore
+        """
+        Downgrades the `upstream_map_indexes` field when converting to the 
previous version.
+
+        Ensures that the field is only a dictionary of  [str, int] (old 
format).
+        """
+        resp = response.body.get("upstream_map_indexes")
+        if isinstance(resp, dict):
+            downgraded = {}
+            for k, v in resp.items():
+                if isinstance(v, int):
+                    downgraded[k] = v
+                elif isinstance(v, list) and v and all(isinstance(i, int) for 
i in v):
+                    downgraded[k] = v[0]
+                else:
+                    # for cases like None, make it -1
+                    downgraded[k] = -1
+            response.body["upstream_map_indexes"] = downgraded
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
new file mode 100644
index 00000000000..fce29c4b2e5
--- /dev/null
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest.mock import patch
+
+import pytest
+
+from airflow.api_fastapi.common.dagbag import create_dag_bag, dag_bag_from_app
+from airflow.utils import timezone
+from airflow.utils.state import State
+
+from tests_common.test_utils.db import clear_db_assets, clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+
+DEFAULT_START_DATE = timezone.parse("2024-10-31T11:00:00Z")
+DEFAULT_END_DATE = timezone.parse("2024-10-31T12:00:00Z")
+
+
+@pytest.fixture
+def ver_client(client):
+    client.headers["Airflow-API-Version"] = "2025-04-28"
+    return client
+
+
+class TestTIUpdateState:
+    def setup_method(self):
+        clear_db_assets()
+        clear_db_runs()
+
+    def teardown_method(self):
+        clear_db_assets()
+        clear_db_runs()
+
+    @pytest.mark.parametrize(
+        "mock_indexes, expected_response_indexes",
+        [
+            pytest.param(
+                [("task_a", 5), ("task_b", 10)],
+                {"task_a": 5, "task_b": 10},
+                id="plain ints",
+            ),
+            pytest.param(
+                [("task_a", [3, 4]), ("task_b", [9])],
+                {"task_a": 3, "task_b": 9},
+                id="list of ints",
+            ),
+            pytest.param(
+                [("task_a", None), ("task_b", [6, 7]), ("task_c", 2)],
+                {"task_a": -1, "task_b": 6, "task_c": 2},
+                id="mixed types",
+            ),
+        ],
+    )
+    
@patch("airflow.api_fastapi.execution_api.routes.task_instances._get_upstream_map_indexes")
+    def test_ti_run(
+        self,
+        mock_get_upstream_map_indexes,
+        ver_client,
+        session,
+        create_task_instance,
+        time_machine,
+        mock_indexes,
+        expected_response_indexes,
+    ):
+        """
+        Test that this version of the endpoint works.
+
+        Later versions modified the type of upstream_map_indexes.
+        """
+        mock_get_upstream_map_indexes.return_value = mock_indexes
+
+        instant_str = "2024-09-30T12:00:00Z"
+        instant = timezone.parse(instant_str)
+        time_machine.move_to(instant, tick=False)
+
+        ti = create_task_instance(
+            task_id="test_ti_run_state_to_running",
+            state=State.QUEUED,
+            session=session,
+            start_date=instant,
+        )
+
+        dag = ti.task.dag
+        dagbag = create_dag_bag()
+        dagbag.dags = {dag.dag_id: dag}
+        execution_app = next(route.app for route in ver_client.app.routes if 
route.path == "/execution")
+        execution_app.dependency_overrides[dag_bag_from_app] = lambda: dagbag
+        session.commit()
+
+        response = ver_client.patch(
+            f"/execution/task-instances/{ti.id}/run",
+            json={
+                "state": "running",
+                "hostname": "random-hostname",
+                "unixname": "random-unixname",
+                "pid": 100,
+                "start_date": instant_str,
+            },
+        )
+
+        assert response.status_code == 200
+        assert response.json() == {
+            "dag_run": {
+                "dag_id": "dag",
+                "run_id": "test",
+                "clear_number": 0,
+                "logical_date": instant_str,
+                "data_interval_start": 
instant.subtract(days=1).to_iso8601_string(),
+                "data_interval_end": instant_str,
+                "run_after": instant_str,
+                "start_date": instant_str,
+                "end_date": None,
+                "run_type": "manual",
+                "conf": {},
+                "consumed_asset_events": [],
+            },
+            "task_reschedule_count": 0,
+            "upstream_map_indexes": expected_response_indexes,
+            "max_tries": 0,
+            "should_retry": False,
+            "variables": [],
+            "connections": [],
+            "xcom_keys_to_clear": [],
+        }
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py 
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index dbf39fa4ae1..3efae80e5b6 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -27,7 +27,7 @@ from uuid import UUID
 
 from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue
 
-API_VERSION: Final[str] = "2025-04-28"
+API_VERSION: Final[str] = "2025-05-20"
 
 
 class AssetAliasReferenceAssetEventDagRun(BaseModel):

Reply via email to