hkc-8010 commented on code in PR #66854:
URL: https://github.com/apache/airflow/pull/66854#discussion_r3353560776
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1368,6 +1368,166 @@ def test_ti_update_state_running_errors(self, client,
session, create_task_insta
assert response.status_code == 422
+ def test_ti_update_state_to_success_asset_registration_failure_returns_204(
+ self, client, session, create_task_instance
+ ):
+ """Regression: asset registration failure after TI state commit must
return 204, not 500.
+
+ The TI state is committed (and the row lock released) before asset
registration runs.
+ If registration fails at that point, the task outcome is already
durable as SUCCESS,
+ so surfacing HTTP 500 would be misleading and cause unnecessary worker
retries.
+ """
+ asset = AssetModel(
+ id=42,
+ name="fail-asset",
+ uri="s3://bucket/fail-asset",
+ group="asset",
+ extra={},
+ )
+ asset_active = AssetActive.for_asset(asset)
+ session.add_all([asset, asset_active])
+
+ ti = create_task_instance(
+ task_id="test_asset_reg_failure",
+ start_date=DEFAULT_START_DATE,
+ state=State.RUNNING,
+ )
+ session.commit()
+
+ with mock.patch(
+
"airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db",
+ side_effect=Exception("simulated DB explosion during asset
registration"),
+ ):
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": "success",
+ "end_date": DEFAULT_END_DATE.isoformat(),
+ "task_outlets": [
+ {"name": "fail-asset", "uri":
"s3://bucket/fail-asset", "type": "Asset"}
+ ],
+ "outlet_events": [],
+ },
+ )
+
+ assert response.status_code == 204, f"Expected 204, got
{response.status_code}: {response.text}"
+ session.expire_all()
+ ti_db = session.get(TaskInstance, ti.id)
+ assert ti_db is not None
+ assert ti_db.state == TaskInstanceState.SUCCESS
+
+ def test_ti_update_state_rolls_back_partial_asset_registration_on_failure(
+ self, client, session, create_task_instance
+ ):
+ asset = AssetModel(
+ id=43,
+ name="partial-asset",
+ uri="s3://bucket/partial-asset",
+ group="asset",
+ extra={},
+ )
+ session.add_all([asset, AssetActive.for_asset(asset)])
+
+ ti = create_task_instance(
+ task_id="test_partial_asset_registration_failure",
+ start_date=DEFAULT_START_DATE,
+ state=State.RUNNING,
+ )
+ session.commit()
+
+ def add_event_then_fail(ti, task_outlets, outlet_events, session):
+ session.add(
+ AssetEvent(
+ asset_id=asset.id,
+ extra={"partial": True},
+ source_task_id=ti.task_id,
+ source_dag_id=ti.dag_id,
+ source_run_id=ti.run_id,
+ source_map_index=ti.map_index,
+ )
+ )
+ session.flush()
+ raise RuntimeError("simulated failure after partial asset
registration")
+
+ with mock.patch(
+
"airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db",
+ side_effect=add_event_then_fail,
+ ):
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": "success",
+ "end_date": DEFAULT_END_DATE.isoformat(),
+ "task_outlets": [
+ {"name": "partial-asset", "uri":
"s3://bucket/partial-asset", "type": "Asset"}
+ ],
+ "outlet_events": [],
+ },
+ )
+
+ assert response.status_code == 204, f"Expected 204, got
{response.status_code}: {response.text}"
+ session.expire_all()
+ ti_db = session.get(TaskInstance, ti.id)
+ assert ti_db is not None
+ assert ti_db.state == TaskInstanceState.SUCCESS
+ assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id ==
asset.id)).all() == []
+
+ def test_ti_update_state_swallow_asset_registration_commit_failure(
+ self, client, session, create_task_instance
+ ):
+ asset = AssetModel(
+ id=44,
+ name="commit-fail-asset",
+ uri="s3://bucket/commit-fail-asset",
+ group="asset",
+ extra={},
+ )
+ session.add_all([asset, AssetActive.for_asset(asset)])
+
+ ti = create_task_instance(
+ task_id="test_asset_registration_commit_failure",
+ start_date=DEFAULT_START_DATE,
+ state=State.RUNNING,
+ )
+ session.commit()
+
+ real_commit = Session.commit
+ commit_count = 0
+ failed_asset_commit = False
+
+ def fail_second_commit(session):
+ nonlocal commit_count, failed_asset_commit
+ commit_count += 1
+ if commit_count == 2:
Review Comment:
Addressed in `4673ff3`. I replaced the ordinal-based commit failure
injection with an asset-registration-aware wrapper around
`register_asset_changes_in_db`, so `clear_on_success` cannot accidentally move
the injected failure onto the wrong commit. The test now lets asset
registration perform its commit and then raises from that path, and it also
asserts the `asset.registration_failures` metric emission.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]