uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2101683462
########## airflow-core/src/airflow/models/deadline.py: ########## @@ -183,3 +156,36 @@ def serialize_deadline_alert(self): "callback_kwargs": self.callback_kwargs, } ) + + +@provide_session +def _fetch_from_db(model_reference: Column, session=None, **conditions) -> datetime: + """ + Fetch a datetime stored in the database. + + :param model_reference: SQLAlchemy Column reference (e.g., DagRun.logical_date, TaskInstance.queued_dttm, etc.) + :param conditions: Key-value pairs which are passed to the WHERE clause. + + :param session: SQLAlchemy session (provided by decorator) + """ + query = select(model_reference) + + for key, value in conditions.items(): + query = query.where(getattr(model_reference.class_, key) == value) + + # This should build a query similar to: + # session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) + logger.debug("db query: session.scalar(%s)", query) + + try: + result = session.scalar(query) + except SQLAlchemyError as e: + logger.error("Database query failed: (%s)", str(e)) + raise + + if result is None: + message = "No matching record found in the database." + logger.error(message) Review Comment: We should include a bit more context here, such of what model and field we failed to fetch from. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org