uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2101683462


##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -183,3 +156,36 @@ def serialize_deadline_alert(self):
                 "callback_kwargs": self.callback_kwargs,
             }
         )
+
+
+@provide_session
+def _fetch_from_db(model_reference: Column, session=None, **conditions) -> 
datetime:
+    """
+    Fetch a datetime stored in the database.
+
+    :param model_reference: SQLAlchemy Column reference (e.g., 
DagRun.logical_date, TaskInstance.queued_dttm, etc.)
+    :param conditions: Key-value pairs which are passed to the WHERE clause.
+
+    :param session: SQLAlchemy session (provided by decorator)
+    """
+    query = select(model_reference)
+
+    for key, value in conditions.items():
+        query = query.where(getattr(model_reference.class_, key) == value)
+
+    # This should build a query similar to:
+    # session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+    logger.debug("db query: session.scalar(%s)", query)
+
+    try:
+        result = session.scalar(query)
+    except SQLAlchemyError as e:
+        logger.error("Database query failed: (%s)", str(e))
+        raise
+
+    if result is None:
+        message = "No matching record found in the database."
+        logger.error(message)

Review Comment:
   We should include a bit more context here, such of what model and field we 
failed to fetch from.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to