This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2b3fa498937 Cadwyn migration for backcompat of `/run` endpoint (#50528)
2b3fa498937 is described below
commit 2b3fa4989371996e1b2867be13efce89942d7e5d
Author: Amogh Desai <[email protected]>
AuthorDate: Wed May 14 18:31:35 2025 +0530
Cadwyn migration for backcompat of `/run` endpoint (#50528)
---
.../api_fastapi/execution_api/versions/__init__.py | 2 +
.../execution_api/versions/v2025_05_20.py | 54 ++++++++
.../versions/v2025_04_28/test_task_instances.py | 142 +++++++++++++++++++++
.../src/airflow/sdk/api/datamodels/_generated.py | 2 +-
4 files changed, 199 insertions(+), 1 deletion(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index 54329054c12..5462f102974 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -20,9 +20,11 @@ from __future__ import annotations
from cadwyn import HeadVersion, Version, VersionBundle
from airflow.api_fastapi.execution_api.versions.v2025_04_28 import
AddRenderedMapIndexField
+from airflow.api_fastapi.execution_api.versions.v2025_05_20 import
DowngradeUpstreamMapIndexes
bundle = VersionBundle(
HeadVersion(),
+ Version("2025-05-20", DowngradeUpstreamMapIndexes),
Version("2025-04-28", AddRenderedMapIndexField),
Version("2025-04-11"),
)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py
new file mode 100644
index 00000000000..f8039f0a4d3
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Optional
+
+from cadwyn import ResponseInfo, VersionChange,
convert_response_to_previous_version_for, schema
+
+from airflow.api_fastapi.execution_api.datamodels.taskinstance import
TIRunContext
+
+
+class DowngradeUpstreamMapIndexes(VersionChange):
+ """Downgrade the upstream map indexes type for older clients."""
+
+ description = __doc__
+
+ instructions_to_migrate_to_previous_version = (
+
schema(TIRunContext).field("upstream_map_indexes").had(type=Optional[dict[str,
int]]), # type: ignore
+ )
+
+ @convert_response_to_previous_version_for(TIRunContext) # type:
ignore[arg-type]
+ def downgrade_upstream_map_indexes(response: ResponseInfo = None) -> None:
# type: ignore
+ """
+ Downgrades the `upstream_map_indexes` field when converting to the
previous version.
+
+ Ensures that the field is only a dictionary of [str, int] (old
format).
+ """
+ resp = response.body.get("upstream_map_indexes")
+ if isinstance(resp, dict):
+ downgraded = {}
+ for k, v in resp.items():
+ if isinstance(v, int):
+ downgraded[k] = v
+ elif isinstance(v, list) and v and all(isinstance(i, int) for
i in v):
+ downgraded[k] = v[0]
+ else:
+ # for cases like None, make it -1
+ downgraded[k] = -1
+ response.body["upstream_map_indexes"] = downgraded
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
new file mode 100644
index 00000000000..fce29c4b2e5
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest.mock import patch
+
+import pytest
+
+from airflow.api_fastapi.common.dagbag import create_dag_bag, dag_bag_from_app
+from airflow.utils import timezone
+from airflow.utils.state import State
+
+from tests_common.test_utils.db import clear_db_assets, clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+
+DEFAULT_START_DATE = timezone.parse("2024-10-31T11:00:00Z")
+DEFAULT_END_DATE = timezone.parse("2024-10-31T12:00:00Z")
+
+
[email protected]
+def ver_client(client):
+ client.headers["Airflow-API-Version"] = "2025-04-28"
+ return client
+
+
+class TestTIUpdateState:
+ def setup_method(self):
+ clear_db_assets()
+ clear_db_runs()
+
+ def teardown_method(self):
+ clear_db_assets()
+ clear_db_runs()
+
+ @pytest.mark.parametrize(
+ "mock_indexes, expected_response_indexes",
+ [
+ pytest.param(
+ [("task_a", 5), ("task_b", 10)],
+ {"task_a": 5, "task_b": 10},
+ id="plain ints",
+ ),
+ pytest.param(
+ [("task_a", [3, 4]), ("task_b", [9])],
+ {"task_a": 3, "task_b": 9},
+ id="list of ints",
+ ),
+ pytest.param(
+ [("task_a", None), ("task_b", [6, 7]), ("task_c", 2)],
+ {"task_a": -1, "task_b": 6, "task_c": 2},
+ id="mixed types",
+ ),
+ ],
+ )
+
@patch("airflow.api_fastapi.execution_api.routes.task_instances._get_upstream_map_indexes")
+ def test_ti_run(
+ self,
+ mock_get_upstream_map_indexes,
+ ver_client,
+ session,
+ create_task_instance,
+ time_machine,
+ mock_indexes,
+ expected_response_indexes,
+ ):
+ """
+ Test that this version of the endpoint works.
+
+ Later versions modified the type of upstream_map_indexes.
+ """
+ mock_get_upstream_map_indexes.return_value = mock_indexes
+
+ instant_str = "2024-09-30T12:00:00Z"
+ instant = timezone.parse(instant_str)
+ time_machine.move_to(instant, tick=False)
+
+ ti = create_task_instance(
+ task_id="test_ti_run_state_to_running",
+ state=State.QUEUED,
+ session=session,
+ start_date=instant,
+ )
+
+ dag = ti.task.dag
+ dagbag = create_dag_bag()
+ dagbag.dags = {dag.dag_id: dag}
+ execution_app = next(route.app for route in ver_client.app.routes if
route.path == "/execution")
+ execution_app.dependency_overrides[dag_bag_from_app] = lambda: dagbag
+ session.commit()
+
+ response = ver_client.patch(
+ f"/execution/task-instances/{ti.id}/run",
+ json={
+ "state": "running",
+ "hostname": "random-hostname",
+ "unixname": "random-unixname",
+ "pid": 100,
+ "start_date": instant_str,
+ },
+ )
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "dag_run": {
+ "dag_id": "dag",
+ "run_id": "test",
+ "clear_number": 0,
+ "logical_date": instant_str,
+ "data_interval_start":
instant.subtract(days=1).to_iso8601_string(),
+ "data_interval_end": instant_str,
+ "run_after": instant_str,
+ "start_date": instant_str,
+ "end_date": None,
+ "run_type": "manual",
+ "conf": {},
+ "consumed_asset_events": [],
+ },
+ "task_reschedule_count": 0,
+ "upstream_map_indexes": expected_response_indexes,
+ "max_tries": 0,
+ "should_retry": False,
+ "variables": [],
+ "connections": [],
+ "xcom_keys_to_clear": [],
+ }
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index dbf39fa4ae1..3efae80e5b6 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -27,7 +27,7 @@ from uuid import UUID
from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue
-API_VERSION: Final[str] = "2025-04-28"
+API_VERSION: Final[str] = "2025-05-20"
class AssetAliasReferenceAssetEventDagRun(BaseModel):