ephraimbuddy commented on code in PR #68118:
URL: https://github.com/apache/airflow/pull/68118#discussion_r3368171141
##########
airflow-core/src/airflow/utils/sqlalchemy.py:
##########
@@ -461,6 +461,48 @@ def lock_rows(query: Select, session: Session) ->
Generator[None, None, None]:
del locked_rows
[email protected]
+def with_db_lock_timeout(session: Session, lock_timeout: int = 30) ->
Generator[None, None, None]:
+ """
+ Context manager to set the database lock timeout for the current session.
+
+ This prevents long-running operations from blocking indefinitely if they
encounter
+ lock contention. Only supported on PostgreSQL and MySQL.
+
+ :param session: ORM Session
+ :param lock_timeout: Lock timeout in seconds.
+ """
+ if lock_timeout <= 0:
+ raise ValueError("lock_timeout must be a positive integer number of
seconds")
+
+ try:
+ dialect_name = get_dialect_name(session)
+ except ValueError:
+ dialect_name = None
+
+ old_mysql_timeout = None
+
+ if dialect_name == "postgresql":
+ # SET LOCAL applies only to the current transaction and resets on
COMMIT/ROLLBACK.
+ session.execute(text(f"SET LOCAL lock_timeout = '{lock_timeout}s'"))
+ elif dialect_name == "mysql":
+ old_mysql_timeout = session.execute(text("SELECT
@@SESSION.innodb_lock_wait_timeout")).scalar()
+ session.execute(text(f"SET SESSION innodb_lock_wait_timeout =
{lock_timeout}"))
+ else:
+ log.warning(
+ "Database lock timeout is not supported for dialect '%s'. "
+ "The requested timeout of %ss will not be applied.",
Review Comment:
This would fire in every deactivate call in unsupported dialects. We should
probably use debug logging here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]