This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 94865f9c6b780ab80bc78f6287752c426e769c60 Author: Jarek Potiuk <[email protected]> AuthorDate: Tue Dec 7 16:05:47 2021 +0100 Adds retry on taskinstance retrieval lock (#20030) Fixes: #19832 Co-authored-by: Jaroslaw Potiuk <[email protected]> (cherry picked from commit 78c815e22b67e442982b53f41d7d899723d5de9f) --- airflow/models/taskinstance.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 6e9862e..f37cada 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -91,6 +91,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname from airflow.utils.operator_helpers import context_to_airflow_vars from airflow.utils.platform import getuser +from airflow.utils.retries import run_with_db_retries from airflow.utils.session import create_session, provide_session from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime from airflow.utils.state import DagRunState, State @@ -723,7 +724,9 @@ class TaskInstance(Base, LoggingMixin): ) if lock_for_update: - ti: Optional[TaskInstance] = qry.with_for_update().first() + for attempt in run_with_db_retries(logger=self.log): + with attempt: + ti: Optional[TaskInstance] = qry.with_for_update().first() else: ti = qry.first() if ti:
