Usuychik commented on code in PR #66765:
URL: https://github.com/apache/airflow/pull/66765#discussion_r3233701230
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1423,6 +1423,48 @@ def test_ti_update_state_database_error(self, client,
session, create_task_insta
assert response.status_code == 500
assert response.json()["detail"] == "Database error occurred"
+ def test_ti_update_state_terminal_does_not_lock_dag_run(self, client,
session, create_task_instance):
+ """
+ Regression guard: session.get(TI, pk, with_for_update=True) must use
+ options=[lazyload(TI.dag_run)] to avoid inadvertently locking dag_run.
+
+ TaskInstance.dag_run has lazy="joined", so without the lazyload
override the
+ ORM emits FOR UPDATE on both task_instance and dag_run. The scheduler
holds
+ a dag_run lock while bulk-updating task_instance rows in
+ _verify_integrity_if_dag_changed, producing a lock-order inversion
deadlock.
+ """
+ from sqlalchemy.orm import lazyload
+
+ ti = create_task_instance(
+ task_id="test_ti_update_state_no_dag_run_lock",
+ state=State.RUNNING,
+ start_date=DEFAULT_START_DATE,
+ )
+ session.commit()
Review Comment:
I can be wrong, cos don`t know so deep knowledge on airflow stack, But from
observation:
Why session.commit() is required here:
The HTTP client that hits the route handler opens its own separate DB
session. The create_task_instance data must be committed before the client
request so the route's session can see it. This is why every single test in
TestTIUpdateState does:
ti = create_task_instance(...)
session.commit() # ← makes the TI visible to the route's session
response = client.patch(...)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]