1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2093551012


##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
     dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
     """
 
+    # Available References.
+    #
+    # The value is the name of the method executed to fetch the datetime
+    # value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+    # will execute dagrun_logical_date() to find the dagrun's logical date.
     DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+    DAGRUN_QUEUED_AT = "dagrun_queued_at"
+    _CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+    def __init__(self, value):
+        self._fixed_dt = None  # Initialize the storage for fixed datetime
+        super().__init__()
+
+    @classmethod
+    def FIXED_DATETIME(cls, dt: datetime):
+        """
+        Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+        For example, you could set the Deadline for "tomorrow before 9AM" by
+        providing the appropriate datetime object."
+        """
+        instance = object.__new__(cls)
+        instance._value_ = "fixed_datetime"
+        instance._fixed_dt = dt
+        return instance
+
+    def evaluate_with(self, **kwargs):
+        """Call the method in the enum's value with the provided kwargs."""
+        if self._fixed_dt:
+            return self._fixed_dt
+        return getattr(self, self.value)(**kwargs)
+
+    def evaluate(self):
+        """Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+        return self.evaluate_with()
+
+    @provide_session
+    def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:

Review Comment:
   just for my learning:
   do we need this layer of abstraction? seems like we are mostly giving one 
condition (dag id specific)



##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
     dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
     """
 
+    # Available References.
+    #
+    # The value is the name of the method executed to fetch the datetime
+    # value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+    # will execute dagrun_logical_date() to find the dagrun's logical date.
     DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+    DAGRUN_QUEUED_AT = "dagrun_queued_at"
+    _CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+    def __init__(self, value):
+        self._fixed_dt = None  # Initialize the storage for fixed datetime
+        super().__init__()
+
+    @classmethod
+    def FIXED_DATETIME(cls, dt: datetime):
+        """
+        Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+        For example, you could set the Deadline for "tomorrow before 9AM" by
+        providing the appropriate datetime object."
+        """
+        instance = object.__new__(cls)
+        instance._value_ = "fixed_datetime"
+        instance._fixed_dt = dt
+        return instance
+
+    def evaluate_with(self, **kwargs):
+        """Call the method in the enum's value with the provided kwargs."""
+        if self._fixed_dt:
+            return self._fixed_dt
+        return getattr(self, self.value)(**kwargs)

Review Comment:
   I think that's inherited from Enum 
https://github.com/python/cpython/blob/main/Lib/enum.py



##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
     dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
     """
 
+    # Available References.
+    #
+    # The value is the name of the method executed to fetch the datetime
+    # value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+    # will execute dagrun_logical_date() to find the dagrun's logical date.
     DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+    DAGRUN_QUEUED_AT = "dagrun_queued_at"
+    _CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+    def __init__(self, value):
+        self._fixed_dt = None  # Initialize the storage for fixed datetime
+        super().__init__()
+
+    @classmethod
+    def FIXED_DATETIME(cls, dt: datetime):
+        """
+        Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+        For example, you could set the Deadline for "tomorrow before 9AM" by
+        providing the appropriate datetime object."
+        """
+        instance = object.__new__(cls)
+        instance._value_ = "fixed_datetime"
+        instance._fixed_dt = dt
+        return instance
+
+    def evaluate_with(self, **kwargs):
+        """Call the method in the enum's value with the provided kwargs."""
+        if self._fixed_dt:
+            return self._fixed_dt
+        return getattr(self, self.value)(**kwargs)
+
+    def evaluate(self):
+        """Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+        return self.evaluate_with()
+
+    @provide_session
+    def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+        """
+        Fetch a datetime stored in the database.
+
+        :param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+        :param column: The column name to fetch
+        :param session: SQLAlchemy session (provided by decorator)
+
+        :param conditions: Key-value pairs which are passed to the WHERE clause
+        """
+        query = select(getattr(model_class, column))
+
+        for key, value in conditions.items():
+            query = query.where(getattr(model_class, key) == value)
+        # This should build a query similar to:
+        # session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+        self.log.debug("db query: session.scalar(%s)", query)

Review Comment:
   super nit:
   fstring



##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
     dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
     """
 
+    # Available References.
+    #
+    # The value is the name of the method executed to fetch the datetime
+    # value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+    # will execute dagrun_logical_date() to find the dagrun's logical date.
     DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+    DAGRUN_QUEUED_AT = "dagrun_queued_at"
+    _CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+    def __init__(self, value):
+        self._fixed_dt = None  # Initialize the storage for fixed datetime
+        super().__init__()
+
+    @classmethod
+    def FIXED_DATETIME(cls, dt: datetime):
+        """
+        Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+        For example, you could set the Deadline for "tomorrow before 9AM" by
+        providing the appropriate datetime object."
+        """
+        instance = object.__new__(cls)
+        instance._value_ = "fixed_datetime"
+        instance._fixed_dt = dt
+        return instance
+
+    def evaluate_with(self, **kwargs):
+        """Call the method in the enum's value with the provided kwargs."""
+        if self._fixed_dt:
+            return self._fixed_dt
+        return getattr(self, self.value)(**kwargs)
+
+    def evaluate(self):
+        """Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+        return self.evaluate_with()
+
+    @provide_session
+    def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+        """
+        Fetch a datetime stored in the database.
+
+        :param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+        :param column: The column name to fetch
+        :param session: SQLAlchemy session (provided by decorator)
+
+        :param conditions: Key-value pairs which are passed to the WHERE clause
+        """
+        query = select(getattr(model_class, column))
+
+        for key, value in conditions.items():
+            query = query.where(getattr(model_class, key) == value)
+        # This should build a query similar to:
+        # session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+        self.log.debug("db query: session.scalar(%s)", query)
+        return session.scalar(query)
+
+    def dagrun_logical_date(self, dag_id: str) -> datetime:
+        from airflow.models import DagRun
+
+        return self._fetch_from_db(DagRun, "logical_date", dag_id=dag_id)

Review Comment:
   nit:
   consider using attribute reference from models to avoid magic/hard-coded 
string as col name here
   "logical_date" -> `DagRun.logical_date`
   
https://github.com/apache/airflow/blob/af06798ccc5d320e6174af9d4c7b10a7333f36e8/airflow-core/src/airflow/api/common/mark_tasks.py#L151



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to