Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-06-02 Thread via GitHub


ferruzzi merged PR #50677:
URL: https://github.com/apache/airflow/pull/50677


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-29 Thread via GitHub


ferruzzi commented on PR #50677:
URL: https://github.com/apache/airflow/pull/50677#issuecomment-2920837957

   @uranusjr if you are happy with it, can you approve when you get time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-27 Thread via GitHub


ferruzzi commented on PR #50677:
URL: https://github.com/apache/airflow/pull/50677#issuecomment-2914568562

   @ashb - When you get a second can you have a look and see if I broke this 
down right?   The part of the code that the user directly interacts with (and 
might be imported into a DAG) now lives in 
`airflow/sdk/definitions/deadline.py` and everything else that I don't want 
them using directly is in `airflow/models/deadline.py`.  
   
   I  think when I asked about it previously, the breakdown was "required db" 
vs "non-db" but this feels like a more natural break for this... is it still 
alright or do i need to put it back to all in the sdk path except the 
_fetch_from_db method?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-27 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2110590988


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -186,3 +159,52 @@ def serialize_deadline_alert(self):
 "callback_kwargs": self.callback_kwargs,
 }
 )
+
+
+@provide_session
+def _fetch_from_db(model_reference: Column, session=None, **conditions) -> 
datetime:
+"""
+Fetch a datetime value from the database using the provided model 
reference and filtering conditions.
+
+For example, to fetch a TaskInstance's start_date:
+_fetch_from_db(
+TaskInstance.start_date, dag_id='example_dag', 
task_id='example_task', run_id='example_run'
+)
+
+This generates SQL equivalent to:
+SELECT start_date
+FROM task_instance
+WHERE dag_id = 'example_dag'
+AND task_id = 'example_task'
+AND run_id = 'example_run'
+
+:param model_reference: SQLAlchemy Column to select (e.g., 
DagRun.logical_date, TaskInstance.start_date)
+:param conditions: Filtering conditions applied as equality comparisons in 
the WHERE clause.
+   Multiple conditions are combined with AND.
+:param session: SQLAlchemy session (auto-provided by decorator)
+"""
+query = select(model_reference)
+
+for key, value in conditions.items():
+query = query.where(getattr(model_reference.class_, key) == value)
+
+compiled_query = query.compile(compile_kwargs={"literal_binds": True})
+pretty_query = "\n".join(str(compiled_query).splitlines())
+logger.debug(
+"Executing query:\n%r\nAs SQL:\n%s",
+query,
+pretty_query,
+)
+
+try:
+result = session.scalar(query)
+except SQLAlchemyError as e:
+logger.error("Database query failed: (%s)", str(e))

Review Comment:
   Is there a similar way to simplify the block below that which is currently:
   
   ```
   if result is None:
   message = f"No matching record found in the database for query:\n
{pretty_query}"
   logger.error(message)
   raise ValueError(message)
   ```
   
   I always felt python needed a "log then raise" method, but maybe there has 
been one all along?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-27 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2110516180


##
task-sdk/src/airflow/sdk/definitions/deadline_reference.py:
##


Review Comment:
   For the first one, I think I maybe should move 
`models.deadline.DeadlineAlert` into the SDK as well since it is a direct user 
interface, which means it looks like I may have other use-cases where I want 
stuff in that path, so that works.  I'll rename the module.
   
   For the second comment,  I'm sorry, I need some more context.  I don't 
really follow what you are suggesting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-27 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2110464534


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -186,3 +159,52 @@ def serialize_deadline_alert(self):
 "callback_kwargs": self.callback_kwargs,
 }
 )
+
+
+@provide_session
+def _fetch_from_db(model_reference: Column, session=None, **conditions) -> 
datetime:
+"""
+Fetch a datetime value from the database using the provided model 
reference and filtering conditions.
+
+For example, to fetch a TaskInstance's start_date:
+_fetch_from_db(
+TaskInstance.start_date, dag_id='example_dag', 
task_id='example_task', run_id='example_run'
+)
+
+This generates SQL equivalent to:
+SELECT start_date
+FROM task_instance
+WHERE dag_id = 'example_dag'
+AND task_id = 'example_task'
+AND run_id = 'example_run'
+
+:param model_reference: SQLAlchemy Column to select (e.g., 
DagRun.logical_date, TaskInstance.start_date)
+:param conditions: Filtering conditions applied as equality comparisons in 
the WHERE clause.
+   Multiple conditions are combined with AND.
+:param session: SQLAlchemy session (auto-provided by decorator)
+"""
+query = select(model_reference)
+
+for key, value in conditions.items():
+query = query.where(getattr(model_reference.class_, key) == value)
+
+compiled_query = query.compile(compile_kwargs={"literal_binds": True})
+pretty_query = "\n".join(str(compiled_query).splitlines())
+logger.debug(
+"Executing query:\n%r\nAs SQL:\n%s",
+query,
+pretty_query,
+)
+
+try:
+result = session.scalar(query)
+except SQLAlchemyError as e:
+logger.error("Database query failed: (%s)", str(e))

Review Comment:
   That's actually pretty great.   I'll be honest, I hadn't realized 
log.exception worked that way.  I'll get it done, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-27 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2110463728


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -100,35 +102,6 @@ def add_deadline(cls, deadline: Deadline, session: Session 
= NEW_SESSION):
 session.add(deadline)
 
 
-class DeadlineReference(Enum):

Review Comment:
   I don't think so.  This was added it in the previous PR purely as a 
placeholder so I could give an example of how something else will be used and 
the note on L107 made that pretty clear.  Nobody should have been using it for 
anything, I think it's safe to just drop it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-26 Thread via GitHub


uranusjr commented on PR #50677:
URL: https://github.com/apache/airflow/pull/50677#issuecomment-2910968578

   Some minor comments above; none of them are blocking, jut personal opinions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-26 Thread via GitHub


uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2108066688


##
task-sdk/src/airflow/sdk/definitions/deadline_reference.py:
##


Review Comment:
   Would it be a good idea to name this just `deadline`? This would match the 
core module, is shorter but still clear enough (I think).
   
   Also, it’s probably a good idea to expose DeadlineReference to `airflow.sdk` 
without needing to do subpackage import. It is a user-facing class, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-26 Thread via GitHub


uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2108062361


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -186,3 +159,52 @@ def serialize_deadline_alert(self):
 "callback_kwargs": self.callback_kwargs,
 }
 )
+
+
+@provide_session
+def _fetch_from_db(model_reference: Column, session=None, **conditions) -> 
datetime:
+"""
+Fetch a datetime value from the database using the provided model 
reference and filtering conditions.
+
+For example, to fetch a TaskInstance's start_date:
+_fetch_from_db(
+TaskInstance.start_date, dag_id='example_dag', 
task_id='example_task', run_id='example_run'
+)
+
+This generates SQL equivalent to:
+SELECT start_date
+FROM task_instance
+WHERE dag_id = 'example_dag'
+AND task_id = 'example_task'
+AND run_id = 'example_run'
+
+:param model_reference: SQLAlchemy Column to select (e.g., 
DagRun.logical_date, TaskInstance.start_date)
+:param conditions: Filtering conditions applied as equality comparisons in 
the WHERE clause.
+   Multiple conditions are combined with AND.
+:param session: SQLAlchemy session (auto-provided by decorator)
+"""
+query = select(model_reference)
+
+for key, value in conditions.items():
+query = query.where(getattr(model_reference.class_, key) == value)
+
+compiled_query = query.compile(compile_kwargs={"literal_binds": True})
+pretty_query = "\n".join(str(compiled_query).splitlines())
+logger.debug(
+"Executing query:\n%r\nAs SQL:\n%s",
+query,
+pretty_query,
+)
+
+try:
+result = session.scalar(query)
+except SQLAlchemyError as e:
+logger.error("Database query failed: (%s)", str(e))

Review Comment:
   ```suggestion
   logger.exception("Database query failed")
   ```
   
   Would this be more useful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-26 Thread via GitHub


uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2108061722


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -100,35 +102,6 @@ def add_deadline(cls, deadline: Deadline, session: Session 
= NEW_SESSION):
 session.add(deadline)
 
 
-class DeadlineReference(Enum):

Review Comment:
   Do we need to leave an import in this module for compatibility?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-26 Thread via GitHub


ramitkataria commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2107923552


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -183,3 +156,52 @@ def serialize_deadline_alert(self):
 "callback_kwargs": self.callback_kwargs,
 }
 )
+
+
+@provide_session
+def _fetch_from_db(model_reference: Column, session=None, **conditions) -> 
datetime:
+"""
+Fetch a datetime value from the database using the provided model 
reference and filtering conditions.
+
+For example, to fetch a TaskInstance's start_date:
+_fetch_from_db(
+TaskInstance.start_date, dag_id='example_dag', 
task_id='example_task', run_id='example_run'
+)
+
+This generates SQL equivalent to:
+SELECT start_date
+FROM task_instance
+WHERE dag_id = 'example_dag'
+AND task_id = 'example_task'
+AND run_id = 'example_run'
+
+:param model_reference: SQLAlchemy Column to select (e.g., 
DagRun.logical_date, TaskInstance.start_date)
+:param conditions: Filtering conditions applied as equality comparisons in 
the WHERE clause.
+   Multiple conditions are combined with AND.
+:param session: SQLAlchemy session (auto-provided by decorator)
+"""
+query = select(model_reference)
+
+for key, value in conditions.items():
+query = query.where(getattr(model_reference.class_, key) == value)

Review Comment:
   I also think your version is easier to read @ferruzzi 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-23 Thread via GitHub


ferruzzi commented on PR #50677:
URL: https://github.com/apache/airflow/pull/50677#issuecomment-2905249533

   ah, found [the email 
thread](https://lists.apache.org/thread/f7t5zl6t3t0s89rt37orfcv4966crojt) I was 
looking for; I always forget that the list's search function defaults to "newer 
than a month".   Based on that discussion, I'm dropping the caplog part of the 
tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2103865896


##
task-sdk/src/airflow/sdk/definitions/deadline_reference.py:
##
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any
+
+from airflow.models.deadline import _fetch_from_db
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class BaseDeadlineReference(LoggingMixin, ABC):
+"""Base class for all Deadline implementations."""
+
+# Set of required kwargs - subclasses should override this.
+required_kwargs: set[str] = set()
+
+def evaluate_with(self, **kwargs: Any) -> datetime:
+"""Validate the provided kwargs and evaluate this deadline with the 
given conditions."""
+filtered_kwargs = {k: kwargs[k] for k in self.required_kwargs if k in 
kwargs}

Review Comment:
   Yeah, should be functionally identical.  I don't feel strongly either way, 
I'll go with yours.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2103862801


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -183,3 +156,52 @@ def serialize_deadline_alert(self):
 "callback_kwargs": self.callback_kwargs,
 }
 )
+
+
+@provide_session
+def _fetch_from_db(model_reference: Column, session=None, **conditions) -> 
datetime:
+"""
+Fetch a datetime value from the database using the provided model 
reference and filtering conditions.
+
+For example, to fetch a TaskInstance's start_date:
+_fetch_from_db(
+TaskInstance.start_date, dag_id='example_dag', 
task_id='example_task', run_id='example_run'
+)
+
+This generates SQL equivalent to:
+SELECT start_date
+FROM task_instance
+WHERE dag_id = 'example_dag'
+AND task_id = 'example_task'
+AND run_id = 'example_run'
+
+:param model_reference: SQLAlchemy Column to select (e.g., 
DagRun.logical_date, TaskInstance.start_date)
+:param conditions: Filtering conditions applied as equality comparisons in 
the WHERE clause.
+   Multiple conditions are combined with AND.
+:param session: SQLAlchemy session (auto-provided by decorator)
+"""
+query = select(model_reference)
+
+for key, value in conditions.items():
+query = query.where(getattr(model_reference.class_, key) == value)

Review Comment:
   Do you feel strongly about this one?  I find your version harder to read for 
some reason, but they are functionally equivalent so I can do it if you want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-22 Thread via GitHub


uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2103584921


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -183,3 +156,52 @@ def serialize_deadline_alert(self):
 "callback_kwargs": self.callback_kwargs,
 }
 )
+
+
+@provide_session
+def _fetch_from_db(model_reference: Column, session=None, **conditions) -> 
datetime:
+"""
+Fetch a datetime value from the database using the provided model 
reference and filtering conditions.
+
+For example, to fetch a TaskInstance's start_date:
+_fetch_from_db(
+TaskInstance.start_date, dag_id='example_dag', 
task_id='example_task', run_id='example_run'
+)
+
+This generates SQL equivalent to:
+SELECT start_date
+FROM task_instance
+WHERE dag_id = 'example_dag'
+AND task_id = 'example_task'
+AND run_id = 'example_run'
+
+:param model_reference: SQLAlchemy Column to select (e.g., 
DagRun.logical_date, TaskInstance.start_date)
+:param conditions: Filtering conditions applied as equality comparisons in 
the WHERE clause.
+   Multiple conditions are combined with AND.
+:param session: SQLAlchemy session (auto-provided by decorator)
+"""
+query = select(model_reference)
+
+for key, value in conditions.items():
+query = query.where(getattr(model_reference.class_, key) == value)

Review Comment:
   ```suggestion
   query = query.where(*(getattr(model_reference.class_, k) == v for for k, 
v in conditions.items()))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-22 Thread via GitHub


uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2103588710


##
task-sdk/src/airflow/sdk/definitions/deadline_reference.py:
##
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any
+
+from airflow.models.deadline import _fetch_from_db
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class BaseDeadlineReference(LoggingMixin, ABC):
+"""Base class for all Deadline implementations."""
+
+# Set of required kwargs - subclasses should override this.
+required_kwargs: set[str] = set()
+
+def evaluate_with(self, **kwargs: Any) -> datetime:
+"""Validate the provided kwargs and evaluate this deadline with the 
given conditions."""
+filtered_kwargs = {k: kwargs[k] for k in self.required_kwargs if k in 
kwargs}

Review Comment:
   ```suggestion
   filtered_kwargs = {k: v for k, v in kwargs.items() if k in 
self.required_kwargs}
   ```
   
   I think this is equivalent and easier to read?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2103372721


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -183,3 +156,36 @@ def serialize_deadline_alert(self):
 "callback_kwargs": self.callback_kwargs,
 }
 )
+
+
+@provide_session
+def _fetch_from_db(model_reference: Column, session=None, **conditions) -> 
datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_reference: SQLAlchemy Column reference (e.g., 
DagRun.logical_date, TaskInstance.queued_dttm, etc.)
+:param conditions: Key-value pairs which are passed to the WHERE clause.
+
+:param session: SQLAlchemy session (provided by decorator)
+"""
+query = select(model_reference)
+
+for key, value in conditions.items():
+query = query.where(getattr(model_reference.class_, key) == value)
+
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+logger.debug("db query: session.scalar(%s)", query)
+
+try:
+result = session.scalar(query)
+except SQLAlchemyError as e:
+logger.error("Database query failed: (%s)", str(e))
+raise
+
+if result is None:
+message = "No matching record found in the database."
+logger.error(message)

Review Comment:
   I've updated all messaging and the docstring for this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2103304812


##
task-sdk/src/airflow/sdk/definitions/deadline_reference.py:
##
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any
+
+from airflow.models.deadline import _fetch_from_db
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class BaseDeadlineReference(LoggingMixin, ABC):
+"""Base class for all Deadline implementations."""
+
+# Whether the evaluation requires conditions.
+requires_conditions = True
+
+# Set of required kwargs - subclasses should override this.
+required_kwargs: set[str] = set()
+
+def evaluate_with(self, **kwargs: Any) -> datetime:
+"""Validate the provided kwargs and evaluate this deadline with the 
given conditions."""
+missing_kwargs = self.required_kwargs - set(kwargs.keys())
+if missing_kwargs:
+raise ValueError(f"Missing required parameters: {', 
'.join(missing_kwargs)}")

Review Comment:
   I've added more robust kwarg verification both ways (missing and extra), 
thanks for the suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2103303252


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()

Review Comment:
   Alright, I have dropped evaluate().   It was overcomplicating things without 
adding any real value.  We can always add it back in later if people want it.  
Resolving this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on PR #50677:
URL: https://github.com/apache/airflow/pull/50677#issuecomment-2902213236

   hmm.  Task SDK tests failing here b7ut passing locally in Breeze I'll 
have to look into that I guess.
   
   
![image](https://github.com/user-attachments/assets/bb0a52db-2f0b-4d56-a833-5226a252d592)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2102812008


##
task-sdk/src/airflow/sdk/definitions/deadline_reference.py:
##
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any
+
+from airflow.models.deadline import _fetch_from_db
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class BaseDeadlineReference(LoggingMixin, ABC):
+"""Base class for all Deadline implementations."""
+
+# Whether the evaluation requires conditions.
+requires_conditions = True
+
+# Set of required kwargs - subclasses should override this.
+required_kwargs: set[str] = set()
+
+def evaluate_with(self, **kwargs: Any) -> datetime:
+"""Validate the provided kwargs and evaluate this deadline with the 
given conditions."""
+missing_kwargs = self.required_kwargs - set(kwargs.keys())
+if missing_kwargs:
+raise ValueError(f"Missing required parameters: {', 
'.join(missing_kwargs)}")
+
+return self._evaluate_with(**kwargs)
+
+@abstractmethod
+def _evaluate_with(self, **kwargs: Any) -> datetime:
+"""Must be implemented by subclasses to perform the actual 
evaluation."""
+raise NotImplementedError
+
+def evaluate(self) -> datetime:
+"""Evaluate this deadline with no parameters."""
+if self.requires_conditions:
+raise AttributeError("This deadline requires additional 
conditions, use evaluate_with() instead.")
+return self.evaluate_with()

Review Comment:
   Responding to your comment below about using "context" there, maybe for now 
I'll try just dropping `evaluate()` and leave `evaluate_with()` as it is 
(without "context") and see how we feel about that.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2102798341


##
task-sdk/src/airflow/sdk/definitions/deadline_reference.py:
##
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any
+
+from airflow.models.deadline import _fetch_from_db
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class BaseDeadlineReference(LoggingMixin, ABC):
+"""Base class for all Deadline implementations."""
+
+# Whether the evaluation requires conditions.
+requires_conditions = True
+
+# Set of required kwargs - subclasses should override this.
+required_kwargs: set[str] = set()
+
+def evaluate_with(self, **kwargs: Any) -> datetime:
+"""Validate the provided kwargs and evaluate this deadline with the 
given conditions."""
+missing_kwargs = self.required_kwargs - set(kwargs.keys())
+if missing_kwargs:
+raise ValueError(f"Missing required parameters: {', 
'.join(missing_kwargs)}")
+
+return self._evaluate_with(**kwargs)
+
+@abstractmethod
+def _evaluate_with(self, **kwargs: Any) -> datetime:
+"""Must be implemented by subclasses to perform the actual 
evaluation."""
+raise NotImplementedError
+
+def evaluate(self) -> datetime:
+"""Evaluate this deadline with no parameters."""
+if self.requires_conditions:
+raise AttributeError("This deadline requires additional 
conditions, use evaluate_with() instead.")
+return self.evaluate_with()

Review Comment:
   Yeah, it felt really strange using `reference.evaluate_with()` without 
anything "with" but I'm likely just overthinking that.  I could drop it and 
make things more streamlined if you really don't like it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2102798341


##
task-sdk/src/airflow/sdk/definitions/deadline_reference.py:
##
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any
+
+from airflow.models.deadline import _fetch_from_db
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class BaseDeadlineReference(LoggingMixin, ABC):
+"""Base class for all Deadline implementations."""
+
+# Whether the evaluation requires conditions.
+requires_conditions = True
+
+# Set of required kwargs - subclasses should override this.
+required_kwargs: set[str] = set()
+
+def evaluate_with(self, **kwargs: Any) -> datetime:
+"""Validate the provided kwargs and evaluate this deadline with the 
given conditions."""
+missing_kwargs = self.required_kwargs - set(kwargs.keys())
+if missing_kwargs:
+raise ValueError(f"Missing required parameters: {', 
'.join(missing_kwargs)}")
+
+return self._evaluate_with(**kwargs)
+
+@abstractmethod
+def _evaluate_with(self, **kwargs: Any) -> datetime:
+"""Must be implemented by subclasses to perform the actual 
evaluation."""
+raise NotImplementedError
+
+def evaluate(self) -> datetime:
+"""Evaluate this deadline with no parameters."""
+if self.requires_conditions:
+raise AttributeError("This deadline requires additional 
conditions, use evaluate_with() instead.")
+return self.evaluate_with()

Review Comment:
   Yeah, it felt really strange using 
`DeadlineReference.FIXED_DATETIME(DEFAULT_DATE).evaluate_with()` without 
anything "with" but I'm likely just overthinking that.  I could drop it and 
make things more streamlined if you really don't like it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-21 Thread via GitHub


uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2101687744


##
task-sdk/src/airflow/sdk/definitions/deadline_reference.py:
##
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any
+
+from airflow.models.deadline import _fetch_from_db
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class BaseDeadlineReference(LoggingMixin, ABC):
+"""Base class for all Deadline implementations."""
+
+# Whether the evaluation requires conditions.
+requires_conditions = True
+
+# Set of required kwargs - subclasses should override this.
+required_kwargs: set[str] = set()
+
+def evaluate_with(self, **kwargs: Any) -> datetime:
+"""Validate the provided kwargs and evaluate this deadline with the 
given conditions."""
+missing_kwargs = self.required_kwargs - set(kwargs.keys())
+if missing_kwargs:
+raise ValueError(f"Missing required parameters: {', 
'.join(missing_kwargs)}")
+
+return self._evaluate_with(**kwargs)
+
+@abstractmethod
+def _evaluate_with(self, **kwargs: Any) -> datetime:
+"""Must be implemented by subclasses to perform the actual 
evaluation."""
+raise NotImplementedError
+
+def evaluate(self) -> datetime:
+"""Evaluate this deadline with no parameters."""
+if self.requires_conditions:
+raise AttributeError("This deadline requires additional 
conditions, use evaluate_with() instead.")
+return self.evaluate_with()

Review Comment:
   I feel this function is not necessary. We could just always use 
`evaluate_with()` directly. Probably not as pretty, but this is not supposed to 
be user-facing anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-21 Thread via GitHub


uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2101686150


##
task-sdk/src/airflow/sdk/definitions/deadline_reference.py:
##
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any
+
+from airflow.models.deadline import _fetch_from_db
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class BaseDeadlineReference(LoggingMixin, ABC):
+"""Base class for all Deadline implementations."""
+
+# Whether the evaluation requires conditions.
+requires_conditions = True
+
+# Set of required kwargs - subclasses should override this.
+required_kwargs: set[str] = set()
+
+def evaluate_with(self, **kwargs: Any) -> datetime:
+"""Validate the provided kwargs and evaluate this deadline with the 
given conditions."""
+missing_kwargs = self.required_kwargs - set(kwargs.keys())
+if missing_kwargs:
+raise ValueError(f"Missing required parameters: {', 
'.join(missing_kwargs)}")

Review Comment:
   ```suggestion
   if missing_kwargs := {k for k in self.required_kwargs if k not in 
kwargs}:
   raise ValueError(f"Missing required parameters: {', 
'.join(missing_kwargs)}")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-21 Thread via GitHub


uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2101683462


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -183,3 +156,36 @@ def serialize_deadline_alert(self):
 "callback_kwargs": self.callback_kwargs,
 }
 )
+
+
+@provide_session
+def _fetch_from_db(model_reference: Column, session=None, **conditions) -> 
datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_reference: SQLAlchemy Column reference (e.g., 
DagRun.logical_date, TaskInstance.queued_dttm, etc.)
+:param conditions: Key-value pairs which are passed to the WHERE clause.
+
+:param session: SQLAlchemy session (provided by decorator)
+"""
+query = select(model_reference)
+
+for key, value in conditions.items():
+query = query.where(getattr(model_reference.class_, key) == value)
+
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+logger.debug("db query: session.scalar(%s)", query)
+
+try:
+result = session.scalar(query)
+except SQLAlchemyError as e:
+logger.error("Database query failed: (%s)", str(e))
+raise
+
+if result is None:
+message = "No matching record found in the database."
+logger.error(message)

Review Comment:
   We should include a bit more context here, such of what model and field we 
failed to fetch from.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-21 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2101542705


##
airflow-core/src/airflow/models/deadline_reference.py:
##


Review Comment:
   Alright, I think I've done that.  @ashb , If you happen to get time to 
review that I've made it work with Task SDK correctly, Id appreciate that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-21 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2101043739


##
airflow-core/src/airflow/models/deadline_reference.py:
##


Review Comment:
   Just asked Ash and he says you are right, I'll make the change.
   
   ```
   Anything used in dag code:sdk
   Anything only used to evaluate/check deadlines: core
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-21 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2101043739


##
airflow-core/src/airflow/models/deadline_reference.py:
##


Review Comment:
   Just asked Ash and he says you are right, I'll make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-21 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2100736613


##
airflow-core/src/airflow/models/deadline_reference.py:
##


Review Comment:
   I honestly have no idea?   Maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-21 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2100697163


##
airflow-core/src/airflow/models/deadline_reference.py:
##
@@ -0,0 +1,162 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from dataclasses import dataclass
+from datetime import datetime
+from enum import Enum
+
+from sqlalchemy import Column, select
+from sqlalchemy.exc import SQLAlchemyError
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.session import provide_session
+
+
+class CalculatedDeadline(LoggingMixin, Enum):
+"""
+Implementation class for deadlines which are calculated at runtime.
+
+Do not instantiate directly. Instead, use DeadlineReference:
+
+deadline = DeadlineAlert(
+reference=DeadlineReference.DAGRUN_LOGICAL_DATE,
+interval=timedelta(hours=1),
+callback=hello_callback,
+)
+"""
+
+DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_reference: Column, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_reference: SQLAlchemy Column reference (e.g., 
DagRun.logical_date, TaskInstance.queued_dttm, etc.)
+:param conditions: Key-value pairs which are passed to the WHERE 
clause.
+
+:param session: SQLAlchemy session (provided by decorator)
+"""
+query = select(model_reference)
+
+for key, value in conditions.items():
+query = query.where(getattr(model_reference.class_, key) == value)
+
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)
+
+try:
+result = session.scalar(query)
+except SQLAlchemyError as e:
+self.log.error("Database query failed: (%s)", str(e))
+raise
+
+if result is None:
+message = "No matching record found in the database."
+self.log.error(message)
+raise ValueError(message)
+
+return result
+
+def dagrun_logical_date(self, dag_id: str) -> datetime:
+from airflow.models import DagRun
+
+return self._fetch_from_db(DagRun.logical_date, dag_id=dag_id)
+
+def dagrun_queued_at(self, dag_id: str) -> datetime:
+from airflow.models import DagRun
+
+return self._fetch_from_db(DagRun.queued_at, dag_id=dag_id)
+
+
+@dataclass
+class FixedDatetimeDeadline(LoggingMixin):
+"""Implementation class for fixed datetime deadlines."""
+
+_datetime: datetime
+
+def evaluate_with(self, **kwargs) -> datetime:
+"""
+Evaluate this deadline reference with the given kwargs.
+
+Ignores all kwargs as fixed deadlines don't need any parameters.
+"""
+if kwargs:
+self.log.debug("Fixed Datetime Deadlines do not accept conditions, 
ignoring kwargs: %s", kwargs)
+return self._datetime
+
+def evaluate(self) -> datetime:
+"""Evaluate this deadline reference with no parameters."""
+return self.evaluate_with()
+
+
+class DeadlineReference:
+"""
+The public interface class for all DeadlineReference options.
+
+This class provides a unified interface for working with Deadlines, 
supporting both
+calculated deadlines (which fetch values from the database) and fixed 
deadlines
+(which return a predefined datetime).
+
+--
+Usage:
+-  --

Review Comment:
   Thanks, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.


Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-21 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2099436549


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+:param column: The column name to fetch
+:param session: SQLAlchemy session (provided by decorator)
+
+:param conditions: Key-value pairs which are passed to the WHERE clause
+"""
+query = select(getattr(model_class, column))
+
+for key, value in conditions.items():
+query = query.where(getattr(model_class, key) == value)
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)
+return session.scalar(query)

Review Comment:
   Thinking on this more, I am going to roll that back.  If no match is found 
then that should be a failure case.  There is no way that I can think of to 
recover from that and an expected alarm will not be going off later.  Changing 
the log messages that i added into exceptions in a coming commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-21 Thread via GitHub


ferruzzi commented on PR #50677:
URL: https://github.com/apache/airflow/pull/50677#issuecomment-2896675827

   Yeah, I don't like some of those changes in the last commit.  I have another 
commit coming which rolls some of those changes back and also implement TP's 
suggestion.  It made the models/deadline.py somewhat complicated so I moved 
these changes (and the relevant tests) into their own file(s), which is going 
to break your comment links.  Sorry in advance.  I'll push the new commit once 
mypy and static checks pass locally


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2099452935


##
airflow-core/src/airflow/models/deadline_reference.py:
##
@@ -0,0 +1,162 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from dataclasses import dataclass
+from datetime import datetime
+from enum import Enum
+
+from sqlalchemy import Column, select
+from sqlalchemy.exc import SQLAlchemyError
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.session import provide_session
+
+
+class CalculatedDeadline(LoggingMixin, Enum):
+"""
+Implementation class for deadlines which are calculated at runtime.
+
+Do not instantiate directly. Instead, use DeadlineReference:
+
+deadline = DeadlineAlert(
+reference=DeadlineReference.DAGRUN_LOGICAL_DATE,
+interval=timedelta(hours=1),
+callback=hello_callback,
+)
+"""
+
+DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_reference: Column, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_reference: SQLAlchemy Column reference (e.g., 
DagRun.logical_date, TaskInstance.queued_dttm, etc.)
+:param conditions: Key-value pairs which are passed to the WHERE 
clause.
+
+:param session: SQLAlchemy session (provided by decorator)
+"""
+query = select(model_reference)
+
+for key, value in conditions.items():
+query = query.where(getattr(model_reference.class_, key) == value)
+
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)
+
+try:
+result = session.scalar(query)
+except SQLAlchemyError as e:
+self.log.error("Database query failed: (%s)", str(e))
+raise
+
+if result is None:
+message = "No matching record found in the database."
+self.log.error(message)
+raise ValueError(message)
+
+return result
+
+def dagrun_logical_date(self, dag_id: str) -> datetime:
+from airflow.models import DagRun
+
+return self._fetch_from_db(DagRun.logical_date, dag_id=dag_id)
+
+def dagrun_queued_at(self, dag_id: str) -> datetime:
+from airflow.models import DagRun
+
+return self._fetch_from_db(DagRun.queued_at, dag_id=dag_id)
+
+
+@dataclass
+class FixedDatetimeDeadline(LoggingMixin):
+"""Implementation class for fixed datetime deadlines."""
+
+_datetime: datetime
+
+def evaluate_with(self, **kwargs) -> datetime:
+"""
+Evaluate this deadline reference with the given kwargs.
+
+Ignores all kwargs as fixed deadlines don't need any parameters.
+"""
+if kwargs:
+self.log.debug("Fixed Datetime Deadlines do not accept conditions, 
ignoring kwargs: %s", kwargs)
+return self._datetime
+
+def evaluate(self) -> datetime:
+"""Evaluate this deadline reference with no parameters."""
+return self.evaluate_with()
+
+
+class DeadlineReference:
+"""
+The public interface class for all DeadlineReference options.
+
+This class provides a unified interface for working with Deadlines, 
supporting both
+calculated deadlines (which fetch values from the database) and fixed 
deadlines
+(which return a predefined datetime).
+
+--
+Usage:
+-  --

Review Comment:
   ```suggestion
   ---
   ```
   
   Acciedntal?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
UR

Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2099456345


##
airflow-core/src/airflow/models/deadline_reference.py:
##


Review Comment:
   I wonder if this module should be put in the task sdk instead. 
`_fetch_from_db` can be moved to `airflow.models.deadline` and imported inside 
`evaluate`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2099437979


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -97,11 +98,9 @@ def add_deadline(cls, deadline: Deadline, session: Session = 
NEW_SESSION):
 session.add(deadline)
 
 
-class DeadlineReference(Enum):
+class DeadlineReference(LoggingMixin, Enum):

Review Comment:
   Let me know what you think about the new implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098874986


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+:param column: The column name to fetch
+:param session: SQLAlchemy session (provided by decorator)
+
+:param conditions: Key-value pairs which are passed to the WHERE clause
+"""
+query = select(getattr(model_class, column))
+
+for key, value in conditions.items():
+query = query.where(getattr(model_class, key) == value)
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)
+return session.scalar(query)

Review Comment:
   Added a whole bunch of error messaging there, along with the tests to go 
with it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098744014


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -97,11 +98,9 @@ def add_deadline(cls, deadline: Deadline, session: Session = 
NEW_SESSION):
 session.add(deadline)
 
 
-class DeadlineReference(Enum):
+class DeadlineReference(LoggingMixin, Enum):

Review Comment:
   That's an interesting approach, thanks for the suggestion.  Let me see what 
I can work out along those lines.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098745558


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()

Review Comment:
   Maybe, but then I feel like just calling it evaluate() misses the implied 
expectation that there usually should be a condition there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


uranusjr commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098736749


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -97,11 +98,9 @@ def add_deadline(cls, deadline: Deadline, session: Session = 
NEW_SESSION):
 session.add(deadline)
 
 
-class DeadlineReference(Enum):
+class DeadlineReference(LoggingMixin, Enum):

Review Comment:
   This class is now awfully complex as an enum, and also does what normal 
enums don’t generally do (contain an arbitrary value via `FIXED_DATETIME`).
   
   Maybe we should split this into two classes like this
   
   ```python
   class DefinedDeadline(Enum):
   DAGRUN_LOGICAL_DATE = ...
   DAGRUN_QUEUED_AT = ...
   
   def evaluate(self, **kwargs):
   ...
   
   @dataclass
   class FixedDeadline:
   at: datetime
   
   def evaluate(self, **kwargs):
   ...
   
   class DeadlineReference:
   DAGRUN_LOGICAL_DATE = DefinedDeadline.DAGRUN_LOGICAL_DATE
   DAGRUN_QUEUED_AT = DefinedDeadline.DAGRUN_QUEUED_AT
   
   FIXED_DATETIME = FixedDeadline
   ```



##
airflow-core/src/airflow/models/deadline.py:
##
@@ -97,11 +98,9 @@ def add_deadline(cls, deadline: Deadline, session: Session = 
NEW_SESSION):
 session.add(deadline)
 
 
-class DeadlineReference(Enum):
+class DeadlineReference(LoggingMixin, Enum):

Review Comment:
   This class is now awfully complex as an enum, and also does what normal 
enums don’t generally do (contain an arbitrary value via `FIXED_DATETIME`).
   
   Maybe we should split this into two classes like this
   
   ```python
   class DefinedDeadline(Enum):
   DAGRUN_LOGICAL_DATE = ...
   DAGRUN_QUEUED_AT = ...
   
   def evaluate(self, **kwargs):
   ...
   
   @dataclass
   class FixedDeadline:
   at: datetime
   
   def evaluate(self, **kwargs):
   ...
   
   class DeadlineReference:
   DAGRUN_LOGICAL_DATE = DefinedDeadline.DAGRUN_LOGICAL_DATE
   DAGRUN_QUEUED_AT = DefinedDeadline.DAGRUN_QUEUED_AT
   FIXED_DATETIME = FixedDeadline
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098560679


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:

Review Comment:
   Thanks.   I'll leave this open for others to weigh in, but consider it a 
non-blocking comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098535047


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+:param column: The column name to fetch
+:param session: SQLAlchemy session (provided by decorator)
+
+:param conditions: Key-value pairs which are passed to the WHERE clause
+"""
+query = select(getattr(model_class, column))
+
+for key, value in conditions.items():
+query = query.where(getattr(model_class, key) == value)
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)

Review Comment:
   TIL - turns out this is for backwards compatibility 
   https://docs.python.org/3/howto/logging.html#logging-variable-data
   thanks for sharing the screenshot 
   (and I should get into the habit of trying things out and building in the 
airflow repo myself :p)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098547629


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()

Review Comment:
   how about just call it `evaluate` with optional args (instead of 
`evaluate_with`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098545729


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+:param column: The column name to fetch
+:param session: SQLAlchemy session (provided by decorator)
+
+:param conditions: Key-value pairs which are passed to the WHERE clause
+"""
+query = select(getattr(model_class, column))
+
+for key, value in conditions.items():
+query = query.where(getattr(model_class, key) == value)
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)
+return session.scalar(query)

Review Comment:
   agreed 
   this should not happen (rare), and shouldn't be fatal since technically 
things didn't fail.
   log warn makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098543605


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:

Review Comment:
   no strong opinion on this
   makes sense if we expect to be reusing this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098535047


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+:param column: The column name to fetch
+:param session: SQLAlchemy session (provided by decorator)
+
+:param conditions: Key-value pairs which are passed to the WHERE clause
+"""
+query = select(getattr(model_class, column))
+
+for key, value in conditions.items():
+query = query.where(getattr(model_class, key) == value)
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)

Review Comment:
   TIL - turns out this is for backwards compatibility 
   thanks for sharing the screenshot 
   (and I should get into the habit of trying things out and building in the 
airflow repo myself :p)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098520059


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()

Review Comment:
   Any opinions on this?   I honestly only added it because 
`DeadlineReference.FIXED_DATETIME(tomorrow).evaluate_with()` looked awkward, 
but maybe it's a bad idea to add the extra method?  I feel like the `with()` 
implies something is required, but maybe I'm just overthinking it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098509477


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+:param column: The column name to fetch
+:param session: SQLAlchemy session (provided by decorator)
+
+:param conditions: Key-value pairs which are passed to the WHERE clause
+"""
+query = select(getattr(model_class, column))
+
+for key, value in conditions.items():
+query = query.where(getattr(model_class, key) == value)
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)
+return session.scalar(query)
+
+def dagrun_logical_date(self, dag_id: str) -> datetime:
+from airflow.models import DagRun
+
+return self._fetch_from_db(DagRun, "logical_date", dag_id=dag_id)

Review Comment:
   Great suggestion, thanks.  Change coming.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098467290


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+:param column: The column name to fetch
+:param session: SQLAlchemy session (provided by decorator)
+
+:param conditions: Key-value pairs which are passed to the WHERE clause
+"""
+query = select(getattr(model_class, column))
+
+for key, value in conditions.items():
+query = query.where(getattr(model_class, key) == value)
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)

Review Comment:
   This time I'm not lying.  This is where the Airflow CI  forces us to not use 
f-strings.
   
   
![image](https://github.com/user-attachments/assets/d40e9c46-7138-4f6c-b364-888d1fa9c8eb)
   
![image](https://github.com/user-attachments/assets/73f7cfa3-c44b-40d2-bc30-854e8cddeebe)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098432341


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+:param column: The column name to fetch
+:param session: SQLAlchemy session (provided by decorator)
+
+:param conditions: Key-value pairs which are passed to the WHERE clause
+"""
+query = select(getattr(model_class, column))
+
+for key, value in conditions.items():
+query = query.where(getattr(model_class, key) == value)
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)
+return session.scalar(query)

Review Comment:
   I'm not sure how that would come to pass, but I suppose extra checks can't 
hurt.   What do you think?  If you set a deadline that is one hour after the 
dagrun is queued, and the system can't find the queued time, what would you 
expect it to do?  I'm thinking a log.warn maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098428231


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"

Review Comment:
   Mostly just for reminding me to proof-read my PRs.  :sweat: That was 
left over from some testing I did where I needed a Reference that wasn't 
related to the dagrun before I settled on the `object.__new__` version of 
fixed_datetime.  I'll remove it.  Thanks for catching that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-20 Thread via GitHub


ferruzzi commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2098415699


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:

Review Comment:
   I debated that a lot myself.   If anyone feels strongly, then I can drop it 
and the two current cases where it is used can be reverted to session.scalar 
calls.  Keep in mind that this is intended to be reused by any number of future 
References and may or may not be more useful with those.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-16 Thread via GitHub


1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2093554618


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"

Review Comment:
   curious what we plan to use `_CUSTOM_REFERENCE_BASE` for



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-16 Thread via GitHub


vincbeck commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2093561653


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)

Review Comment:
   Oh it is a an enum! My bad :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-16 Thread via GitHub


1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2093560619


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+:param column: The column name to fetch
+:param session: SQLAlchemy session (provided by decorator)
+
+:param conditions: Key-value pairs which are passed to the WHERE clause
+"""
+query = select(getattr(model_class, column))
+
+for key, value in conditions.items():
+query = query.where(getattr(model_class, key) == value)
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)
+return session.scalar(query)

Review Comment:
   What's the intended behavior if no matching records found
   do we need to handle existence /  check if None here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-16 Thread via GitHub


1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2093547767


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:
+"""
+Fetch a datetime stored in the database.
+
+:param model_class: The Airflow model class (e.g., DagRun, 
TaskInstance, etc.)
+:param column: The column name to fetch
+:param session: SQLAlchemy session (provided by decorator)
+
+:param conditions: Key-value pairs which are passed to the WHERE clause
+"""
+query = select(getattr(model_class, column))
+
+for key, value in conditions.items():
+query = query.where(getattr(model_class, key) == value)
+# This should build a query similar to:
+# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == 
dag_id))
+self.log.debug("db query: session.scalar(%s)", query)
+return session.scalar(query)
+
+def dagrun_logical_date(self, dag_id: str) -> datetime:
+from airflow.models import DagRun
+
+return self._fetch_from_db(DagRun, "logical_date", dag_id=dag_id)

Review Comment:
   nit:
   consider using attribute reference from models to avoid magic/hard-coded 
string as col name here getting out of sync
   "logical_date" -> `DagRun.logical_date`
   
https://github.com/apache/airflow/blob/af06798ccc5d320e6174af9d4c7b10a7333f36e8/airflow-core/src/airflow/api/common/mark_tasks.py#L151



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-16 Thread via GitHub


1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2093554618


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"

Review Comment:
   curious what we plan to use this for



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-86 - Add Deadline References [airflow]

2025-05-16 Thread via GitHub


1fanwang commented on code in PR #50677:
URL: https://github.com/apache/airflow/pull/50677#discussion_r2093551012


##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)
+
+def evaluate(self):
+"""Call evaluate_with() without any conditions, because it looks 
strange in use that way."""
+return self.evaluate_with()
+
+@provide_session
+def _fetch_from_db(self, model_class: Base, column: str, session=None, 
**conditions) -> datetime:

Review Comment:
   just for my learning:
   do we need this layer of abstraction? seems like we are mostly giving one 
condition (dag id specific)



##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+return getattr(self, self.value)(**kwargs)

Review Comment:
   I think that's inherited from Enum 
https://github.com/python/cpython/blob/main/Lib/enum.py



##
airflow-core/src/airflow/models/deadline.py:
##
@@ -123,7 +122,71 @@ class DeadlineReference(Enum):
 dag.deadline.reference.evaluate_with(dag_id=dag.dag_id)
 """
 
+# Available References.
+#
+# The value is the name of the method executed to fetch the datetime
+# value for the given Reference. For example DAGRUN_LOGICAL_DATE = 
"dagrun_logical_date"
+# will execute dagrun_logical_date() to find the dagrun's logical date.
 DAGRUN_LOGICAL_DATE = "dagrun_logical_date"
+DAGRUN_QUEUED_AT = "dagrun_queued_at"
+_CUSTOM_REFERENCE_BASE = "_custom_reference"
+
+def __init__(self, value):
+self._fixed_dt = None  # Initialize the storage for fixed datetime
+super().__init__()
+
+@classmethod
+def FIXED_DATETIME(cls, dt: datetime):
+"""
+Calculate a reference based on a set datetime rather than fetching a 
value from the database.
+
+For example, you could set the Deadline for "tomorrow before 9AM" by
+providing the appropriate datetime object."
+"""
+instance = object.__new__(cls)
+instance._value_ = "fixed_datetime"
+instance._fixed_dt = dt
+return instance
+
+def evaluate_with(self, **kwargs):
+"""Call the method in the enum's value with the provided kwargs."""
+if self._fixed_dt:
+return self._fixed_dt
+