1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2093560619


##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
     dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
     """
 
+    # Available References.
+    #
+    # The value is the name of the method executed to fetch the datetime
+    # value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+    # will execute dagrun_logical_date() to find the dagrun's logical date.
     DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+    DAGRUN_QUEUED_AT = "dagrun_queued_at"
+    _CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+    def __init__(self, value):
+        self._fixed_dt = None  # Initialize the storage for fixed datetime
+        super().__init__()
+
+    @classmethod
+    def FIXED_DATETIME(cls, dt: datetime):
+        """
+        Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+        For example, you could set the Deadline for "tomorrow before 9AM" by
+        providing the appropriate datetime object."
+        """
+        instance = object.__new__(cls)
+        instance._value_ = "fixed_datetime"
+        instance._fixed_dt = dt
+        return instance
+
+    def evaluate_with(self, **kwargs):
+        """Call the method in the enum's value with the provided kwargs."""
+        if self._fixed_dt:
+            return self._fixed_dt
+        return getattr(self, self.value)(**kwargs)
+
+    def evaluate(self):
+        """Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+        return self.evaluate_with()
+
+    @provide_session
+    def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+        """
+        Fetch a datetime stored in the database.
+
+        :param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+        :param column: The column name to fetch
+        :param session: SQLAlchemy session (provided by decorator)
+
+        :param conditions: Key-value pairs which are passed to the WHERE clause
+        """
+        query = select(getattr(model_class, column))
+
+        for key, value in conditions.items():
+            query = query.where(getattr(model_class, key) == value)
+        # This should build a query similar to:
+        # session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+        self.log.debug("db query: session.scalar(%s)", query)
+        return session.scalar(query)

Review Comment:
   What's the intended behavior if no matching records found
   do we need to handle existence /  check if None here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to