pierrejeambrun commented on code in PR #61550:
URL: https://github.com/apache/airflow/pull/61550#discussion_r3281945925
##########
airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py:
##########
@@ -202,6 +202,26 @@ class AddDagEndpoint(VersionChange):
instructions_to_migrate_to_previous_version = (endpoint("/dags/{dag_id}",
["GET"]).didnt_exist,)
+class AddBundleVersionField(VersionChange):
Review Comment:
**Blocker.** `v2026_04_06.py` shipped in 3.2.0 and 3.2.1. Per
[`contributing-docs/19_execution_api_versioning.rst`](https://github.com/apache/airflow/blob/main/contributing-docs/19_execution_api_versioning.rst),
new migrations only go into **unreleased** versions. Move
`AddBundleVersionField` to a new CalVer file dated to the planned release, and
update `versions/__init__.py`.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -615,16 +633,25 @@ def trigger_dag_run(
triggering_user_name=user.get_name(),
state=DagRunState.QUEUED,
partition_key=params["partition_key"],
+ bundle_version=body.bundle_version,
+ dag_version=resolved_dag_version,
session=session,
)
+
+ dag_run_note = body.note
+ if dag_run_note:
+ current_user_id = user.get_id()
+ dag_run.note = (dag_run_note, current_user_id)
+ return dag_run
+
except (ParamValidationError, ValueError) as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e
-
- dag_run_note = body.note
- if dag_run_note:
- current_user_id = user.get_id()
- dag_run.note = (dag_run_note, current_user_id)
- return dag_run
+ except AirflowNotFoundException as e:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, str(e))
+ except AirflowException as e:
Review Comment:
Carry-over from my Mar 6 comment. The catch is generic 400 but it forwards
`str(e)`, so messages like "Cannot create DagRun ... because the dag is not
serialized" leak through as a 400. Either narrow the catch to the specific
exceptions `create_dagrun` raises as user errors, or replace the message with a
generic one before raising.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -600,10 +600,28 @@ def trigger_dag_run(
else:
triggered_by = DagRunTriggeredByType.REST_API
- dag = get_latest_version_of_dag(dag_bag, dag_id, session)
- params = body.validate_context(dag)
-
try:
+ dag = get_latest_version_of_dag(dag_bag, dag_id, session)
+
+ resolved_dag_version = None
+ if body.bundle_version is not None:
Review Comment:
Carry-over from my Feb 17 / 20 comment. The DB re-fetch is now gone (good),
but the `disable_bundle_versioning` check, the `get_latest_version` call, and
the 404 still live in both the route AND `_create_orm_dagrun`. Move it fully
into `_create_orm_dagrun` so the route just passes `bundle_version` and catches
a specific exception.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -615,16 +633,25 @@ def trigger_dag_run(
triggering_user_name=user.get_name(),
state=DagRunState.QUEUED,
partition_key=params["partition_key"],
+ bundle_version=body.bundle_version,
+ dag_version=resolved_dag_version,
session=session,
)
+
+ dag_run_note = body.note
+ if dag_run_note:
+ current_user_id = user.get_id()
+ dag_run.note = (dag_run_note, current_user_id)
+ return dag_run
+
except (ParamValidationError, ValueError) as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e
-
- dag_run_note = body.note
- if dag_run_note:
- current_user_id = user.get_id()
- dag_run.note = (dag_run_note, current_user_id)
- return dag_run
+ except AirflowNotFoundException as e:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, str(e))
+ except AirflowException as e:
+ raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e))
+ except ValueError as e:
Review Comment:
Dead branch — `ValueError` is already caught in the first tuple above. Drop
it.
```python
except (ParamValidationError, ValueError) as e:
raise HTTPException(...) from e
...
except ValueError as e: # never reached
raise HTTPException(...)
```
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -615,16 +633,25 @@ def trigger_dag_run(
triggering_user_name=user.get_name(),
state=DagRunState.QUEUED,
partition_key=params["partition_key"],
+ bundle_version=body.bundle_version,
+ dag_version=resolved_dag_version,
session=session,
)
+
+ dag_run_note = body.note
+ if dag_run_note:
+ current_user_id = user.get_id()
+ dag_run.note = (dag_run_note, current_user_id)
+ return dag_run
+
except (ParamValidationError, ValueError) as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e
-
- dag_run_note = body.note
- if dag_run_note:
- current_user_id = user.get_id()
- dag_run.note = (dag_run_note, current_user_id)
- return dag_run
+ except AirflowNotFoundException as e:
Review Comment:
Three of the four except blocks lost their `from e`. Add it back so
tracebacks survive.
##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -69,6 +69,21 @@
log = structlog.get_logger(__name__)
+# Callbacks are not serialized, so when running an older bundle version we
copy them
+# from the live dag to the resolved dag so dag-level event hooks still fire
correctly.
+# Other DAG-level attrs (max_active_runs, catchup, params, …) intentionally
come from
+# the old serialized version, since the caller asked for that specific
historical snapshot.
+_DAG_CALLBACK_ATTRS = (
Review Comment:
`_DAG_CALLBACK_ATTRS` lists 8 attrs but
`test_create_dagrun_callbacks_copied_to_resolved_bundle_version` only asserts
on 2 (`on_failure_callback`, `on_success_callback`). Parametrize the test over
the tuple, or add at least one of `sla_miss_callback` / `on_retry_callback`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]