This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f2fe0df6b3caa86a4315322264fad077f03b32e6 Author: Jarek Potiuk <ja...@potiuk.com> AuthorDate: Mon Feb 7 20:12:05 2022 +0100 Avoid deadlock when rescheduling task (#21362) The scheduler job performs scheduling after locking the "scheduled" DagRun row for writing. This should prevent from modifying DagRun and related task instances by another scheduler or "mini-scheduler" run after task is completed. However there is apparently one more case where the DagRun is being locked by "Task" processes - namely when task throws AirflowRescheduleException. In this case a new "TaskReschedule" entity is inserted into the database and it also performs lock on the DagRun (because TaskReschedule has "DagRun" relationship. This PR modifies handling the AirflowRescheduleException to obtain the very same DagRun lock before it attempts to insert TaskReschedule entity. Seems that TaskReschedule is the only one that has this relationship so likely all the misterious SchedulerJob deadlock cases we experienced might be explained (and fixed) by this one. It is likely that this one: * Fixes: #16982 * Fixes: #19957 (cherry picked from commit 6d110b565a505505351d1ff19592626fb24e4516) --- airflow/models/taskinstance.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index ec34156..2dcc923 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -93,7 +93,7 @@ from airflow.utils.operator_helpers import context_to_airflow_vars from airflow.utils.platform import getuser from airflow.utils.retries import run_with_db_retries from airflow.utils.session import create_session, provide_session -from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime +from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, with_row_locks from airflow.utils.state import DagRunState, State from airflow.utils.timeout import timeout @@ -1657,11 +1657,24 @@ class TaskInstance(Base, LoggingMixin): # Don't record reschedule request in test mode if test_mode: return + + from airflow.models.dagrun import DagRun # Avoid circular import + self.refresh_from_db(session) self.end_date = timezone.utcnow() self.set_duration() + # Lock DAG run to be sure not to get into a deadlock situation when trying to insert + # TaskReschedule which apparently also creates lock on corresponding DagRun entity + with_row_locks( + session.query(DagRun).filter_by( + dag_id=self.dag_id, + run_id=self.run_id, + ), + session=session, + ).one() + # Log reschedule request session.add( TaskReschedule(