This is an automated email from the ASF dual-hosted git repository. amoghdesai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 2b3fa498937 Cadwyn migration for backcompat of `/run` endpoint (#50528) 2b3fa498937 is described below commit 2b3fa4989371996e1b2867be13efce89942d7e5d Author: Amogh Desai <amoghrajesh1...@gmail.com> AuthorDate: Wed May 14 18:31:35 2025 +0530 Cadwyn migration for backcompat of `/run` endpoint (#50528) --- .../api_fastapi/execution_api/versions/__init__.py | 2 + .../execution_api/versions/v2025_05_20.py | 54 ++++++++ .../versions/v2025_04_28/test_task_instances.py | 142 +++++++++++++++++++++ .../src/airflow/sdk/api/datamodels/_generated.py | 2 +- 4 files changed, 199 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 54329054c12..5462f102974 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -20,9 +20,11 @@ from __future__ import annotations from cadwyn import HeadVersion, Version, VersionBundle from airflow.api_fastapi.execution_api.versions.v2025_04_28 import AddRenderedMapIndexField +from airflow.api_fastapi.execution_api.versions.v2025_05_20 import DowngradeUpstreamMapIndexes bundle = VersionBundle( HeadVersion(), + Version("2025-05-20", DowngradeUpstreamMapIndexes), Version("2025-04-28", AddRenderedMapIndexField), Version("2025-04-11"), ) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py new file mode 100644 index 00000000000..f8039f0a4d3 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Optional + +from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema + +from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIRunContext + + +class DowngradeUpstreamMapIndexes(VersionChange): + """Downgrade the upstream map indexes type for older clients.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = ( + schema(TIRunContext).field("upstream_map_indexes").had(type=Optional[dict[str, int]]), # type: ignore + ) + + @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] + def downgrade_upstream_map_indexes(response: ResponseInfo = None) -> None: # type: ignore + """ + Downgrades the `upstream_map_indexes` field when converting to the previous version. + + Ensures that the field is only a dictionary of [str, int] (old format). + """ + resp = response.body.get("upstream_map_indexes") + if isinstance(resp, dict): + downgraded = {} + for k, v in resp.items(): + if isinstance(v, int): + downgraded[k] = v + elif isinstance(v, list) and v and all(isinstance(i, int) for i in v): + downgraded[k] = v[0] + else: + # for cases like None, make it -1 + downgraded[k] = -1 + response.body["upstream_map_indexes"] = downgraded diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py new file mode 100644 index 00000000000..fce29c4b2e5 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py @@ -0,0 +1,142 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from airflow.api_fastapi.common.dagbag import create_dag_bag, dag_bag_from_app +from airflow.utils import timezone +from airflow.utils.state import State + +from tests_common.test_utils.db import clear_db_assets, clear_db_runs + +pytestmark = pytest.mark.db_test + + +DEFAULT_START_DATE = timezone.parse("2024-10-31T11:00:00Z") +DEFAULT_END_DATE = timezone.parse("2024-10-31T12:00:00Z") + + +@pytest.fixture +def ver_client(client): + client.headers["Airflow-API-Version"] = "2025-04-28" + return client + + +class TestTIUpdateState: + def setup_method(self): + clear_db_assets() + clear_db_runs() + + def teardown_method(self): + clear_db_assets() + clear_db_runs() + + @pytest.mark.parametrize( + "mock_indexes, expected_response_indexes", + [ + pytest.param( + [("task_a", 5), ("task_b", 10)], + {"task_a": 5, "task_b": 10}, + id="plain ints", + ), + pytest.param( + [("task_a", [3, 4]), ("task_b", [9])], + {"task_a": 3, "task_b": 9}, + id="list of ints", + ), + pytest.param( + [("task_a", None), ("task_b", [6, 7]), ("task_c", 2)], + {"task_a": -1, "task_b": 6, "task_c": 2}, + id="mixed types", + ), + ], + ) + @patch("airflow.api_fastapi.execution_api.routes.task_instances._get_upstream_map_indexes") + def test_ti_run( + self, + mock_get_upstream_map_indexes, + ver_client, + session, + create_task_instance, + time_machine, + mock_indexes, + expected_response_indexes, + ): + """ + Test that this version of the endpoint works. + + Later versions modified the type of upstream_map_indexes. + """ + mock_get_upstream_map_indexes.return_value = mock_indexes + + instant_str = "2024-09-30T12:00:00Z" + instant = timezone.parse(instant_str) + time_machine.move_to(instant, tick=False) + + ti = create_task_instance( + task_id="test_ti_run_state_to_running", + state=State.QUEUED, + session=session, + start_date=instant, + ) + + dag = ti.task.dag + dagbag = create_dag_bag() + dagbag.dags = {dag.dag_id: dag} + execution_app = next(route.app for route in ver_client.app.routes if route.path == "/execution") + execution_app.dependency_overrides[dag_bag_from_app] = lambda: dagbag + session.commit() + + response = ver_client.patch( + f"/execution/task-instances/{ti.id}/run", + json={ + "state": "running", + "hostname": "random-hostname", + "unixname": "random-unixname", + "pid": 100, + "start_date": instant_str, + }, + ) + + assert response.status_code == 200 + assert response.json() == { + "dag_run": { + "dag_id": "dag", + "run_id": "test", + "clear_number": 0, + "logical_date": instant_str, + "data_interval_start": instant.subtract(days=1).to_iso8601_string(), + "data_interval_end": instant_str, + "run_after": instant_str, + "start_date": instant_str, + "end_date": None, + "run_type": "manual", + "conf": {}, + "consumed_asset_events": [], + }, + "task_reschedule_count": 0, + "upstream_map_indexes": expected_response_indexes, + "max_tries": 0, + "should_retry": False, + "variables": [], + "connections": [], + "xcom_keys_to_clear": [], + } diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index dbf39fa4ae1..3efae80e5b6 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -27,7 +27,7 @@ from uuid import UUID from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue -API_VERSION: Final[str] = "2025-04-28" +API_VERSION: Final[str] = "2025-05-20" class AssetAliasReferenceAssetEventDagRun(BaseModel):