hkc-8010 commented on code in PR #66854:
URL: https://github.com/apache/airflow/pull/66854#discussion_r3353560776


##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1368,6 +1368,166 @@ def test_ti_update_state_running_errors(self, client, 
session, create_task_insta
 
         assert response.status_code == 422
 
+    def test_ti_update_state_to_success_asset_registration_failure_returns_204(
+        self, client, session, create_task_instance
+    ):
+        """Regression: asset registration failure after TI state commit must 
return 204, not 500.
+
+        The TI state is committed (and the row lock released) before asset 
registration runs.
+        If registration fails at that point, the task outcome is already 
durable as SUCCESS,
+        so surfacing HTTP 500 would be misleading and cause unnecessary worker 
retries.
+        """
+        asset = AssetModel(
+            id=42,
+            name="fail-asset",
+            uri="s3://bucket/fail-asset",
+            group="asset",
+            extra={},
+        )
+        asset_active = AssetActive.for_asset(asset)
+        session.add_all([asset, asset_active])
+
+        ti = create_task_instance(
+            task_id="test_asset_reg_failure",
+            start_date=DEFAULT_START_DATE,
+            state=State.RUNNING,
+        )
+        session.commit()
+
+        with mock.patch(
+            
"airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db",
+            side_effect=Exception("simulated DB explosion during asset 
registration"),
+        ):
+            response = client.patch(
+                f"/execution/task-instances/{ti.id}/state",
+                json={
+                    "state": "success",
+                    "end_date": DEFAULT_END_DATE.isoformat(),
+                    "task_outlets": [
+                        {"name": "fail-asset", "uri": 
"s3://bucket/fail-asset", "type": "Asset"}
+                    ],
+                    "outlet_events": [],
+                },
+            )
+
+        assert response.status_code == 204, f"Expected 204, got 
{response.status_code}: {response.text}"
+        session.expire_all()
+        ti_db = session.get(TaskInstance, ti.id)
+        assert ti_db is not None
+        assert ti_db.state == TaskInstanceState.SUCCESS
+
+    def test_ti_update_state_rolls_back_partial_asset_registration_on_failure(
+        self, client, session, create_task_instance
+    ):
+        asset = AssetModel(
+            id=43,
+            name="partial-asset",
+            uri="s3://bucket/partial-asset",
+            group="asset",
+            extra={},
+        )
+        session.add_all([asset, AssetActive.for_asset(asset)])
+
+        ti = create_task_instance(
+            task_id="test_partial_asset_registration_failure",
+            start_date=DEFAULT_START_DATE,
+            state=State.RUNNING,
+        )
+        session.commit()
+
+        def add_event_then_fail(ti, task_outlets, outlet_events, session):
+            session.add(
+                AssetEvent(
+                    asset_id=asset.id,
+                    extra={"partial": True},
+                    source_task_id=ti.task_id,
+                    source_dag_id=ti.dag_id,
+                    source_run_id=ti.run_id,
+                    source_map_index=ti.map_index,
+                )
+            )
+            session.flush()
+            raise RuntimeError("simulated failure after partial asset 
registration")
+
+        with mock.patch(
+            
"airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db",
+            side_effect=add_event_then_fail,
+        ):
+            response = client.patch(
+                f"/execution/task-instances/{ti.id}/state",
+                json={
+                    "state": "success",
+                    "end_date": DEFAULT_END_DATE.isoformat(),
+                    "task_outlets": [
+                        {"name": "partial-asset", "uri": 
"s3://bucket/partial-asset", "type": "Asset"}
+                    ],
+                    "outlet_events": [],
+                },
+            )
+
+        assert response.status_code == 204, f"Expected 204, got 
{response.status_code}: {response.text}"
+        session.expire_all()
+        ti_db = session.get(TaskInstance, ti.id)
+        assert ti_db is not None
+        assert ti_db.state == TaskInstanceState.SUCCESS
+        assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id == 
asset.id)).all() == []
+
+    def test_ti_update_state_swallow_asset_registration_commit_failure(
+        self, client, session, create_task_instance
+    ):
+        asset = AssetModel(
+            id=44,
+            name="commit-fail-asset",
+            uri="s3://bucket/commit-fail-asset",
+            group="asset",
+            extra={},
+        )
+        session.add_all([asset, AssetActive.for_asset(asset)])
+
+        ti = create_task_instance(
+            task_id="test_asset_registration_commit_failure",
+            start_date=DEFAULT_START_DATE,
+            state=State.RUNNING,
+        )
+        session.commit()
+
+        real_commit = Session.commit
+        commit_count = 0
+        failed_asset_commit = False
+
+        def fail_second_commit(session):
+            nonlocal commit_count, failed_asset_commit
+            commit_count += 1
+            if commit_count == 2:

Review Comment:
   Addressed in `4673ff3`. I replaced the ordinal-based commit failure 
injection with an asset-registration-aware wrapper around 
`register_asset_changes_in_db`, so `clear_on_success` cannot accidentally move 
the injected failure onto the wrong commit. The test now lets asset 
registration perform its commit and then raises from that path, and it also 
asserts the `asset.registration_failures` metric emission.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to