Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-06-24 Thread via GitHub


rawwar closed pull request #50957: Aip 86: add deadlines to DagRunResponse
URL: https://github.com/apache/airflow/pull/50957


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-06-23 Thread via GitHub


rawwar commented on PR #50957:
URL: https://github.com/apache/airflow/pull/50957#issuecomment-2998795623

   I haven't been getting time to finish this. I'll let someone pick and finish 
this off. At least the PR can act as a starting point to anyone who picks it up


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-06-17 Thread via GitHub


rawwar commented on PR #50957:
URL: https://github.com/apache/airflow/pull/50957#issuecomment-2981327503

   > FYI, this PR was merged recently #51698 which will impact this PR.
   
   @pierrejeambrun , what kind of impact? Did you mean merge conflicts or do I 
need to be updating something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-06-14 Thread via GitHub


rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2146774803


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   @ferruzzi , if that's the case, should i keep this PR on hold until we add 
the support to have multiple deadlines? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-06-10 Thread via GitHub


ferruzzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2138333416


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   I should also clarify that the current code as written is still a single 
deadline, but I need to make that change before launch, it WILL accept a list 
but the current code in the repo doesn't show that.  Sorry if I mislead or 
confused anyone on that point.  It came up in a Meetup discussion after I did 
the initial work, but it's a great idea and I'm going to be adding it in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-06-10 Thread via GitHub


ferruzzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2138330321


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   I brought it up on the dev call last week and we're going to make (a 
variation on) the change you suggested.  Here's what the lifespan of a Deadline 
will be, using Dagrun as an example:
   
   - When a new Dagrun is created:
   - If the dag has a deadline, calculate the value and store it along with 
the dag_id, run_id, etc in the `deadline` table
   
   - When a dagrun finishes:
   - If (and only if) the current time is before the calculated deadline, 
then remove the deadline from the `deadline` table
   
   -  The scheduler loop will query the deadline table to see if any deadlines 
have expired
   - If yes then the callback is sent to the Triggerer to process
   - once the callback is run, the Triggerer will move the failed deadline 
to a new table (working name is just `missed_deadlines`?)
   
   
So `deadlines` table will have all upcoming and unprocessed deadlines.  
`missed_deadlines` table will have all expired deadlines which have been 
resolved.  If the serialized DAG has a deadline and it is in neither table, 
then it will be assumed that it completed successfully.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-06-09 Thread via GitHub


rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2136921222


##
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py:
##
@@ -166,6 +180,13 @@ def get_dag_versions_dict(dag_versions: list[DagVersion]) 
-> list[dict]:
 ]
 
 
+def convert_instrumented_list_to_dict(lst):
+result = [obj.__dict__ for obj in lst]
+for obj in result:
+obj.pop("_sa_instance_state", None)
+return result
+
+

Review Comment:
   Yes. That's an additional item on the list and was failing tests due to a 
mismatch in comparison caused by `_sa_instance_state`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-06-02 Thread via GitHub


pierrejeambrun commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2120350829


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   Maybe I misunderstood something. As I understand it, there's no record in 
the db at all for deadlines unless the run is in progress?
   
   I assume we can always improve later and add that if people request it. 
(soft delete etc...)



##
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py:
##
@@ -166,6 +180,13 @@ def get_dag_versions_dict(dag_versions: list[DagVersion]) 
-> list[dict]:
 ]
 
 
+def convert_instrumented_list_to_dict(lst):
+result = [obj.__dict__ for obj in lst]
+for obj in result:
+obj.pop("_sa_instance_state", None)
+return result
+
+

Review Comment:
   Is that needed ?



##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   `deadlines: list[DeadlineResponse]` should be enough, the relationship 
should return an empty list when there is no related items



##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   From the API point of view, we can nest the `deadline` API inside the 
`dagrun` one. Because deadline can be considered a sub-resource of the dagrun, 
they only exist for their corresponding dagrun, and especially when the dagrun 
is in a specific state, otherwise they are removed. (But that's debatable, 
since all the API resource are for now flat at root level, we could do the same 
here).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-26 Thread via GitHub


ferruzzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2107658391


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   If you want to change the plan at this point, please start a dev list 
discussion for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-23 Thread via GitHub


pierrejeambrun commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2104680420


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   I think we shouldn't remove rows, maybe just mark them `archieved` via a 
boolean in the db. This way we can still access the history for UI or audit. 
Also adding a column `completed_at` to know when was the deadline completed can 
even be an interesting option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-23 Thread via GitHub


rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2104148488


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past 
then send the callback for execution and remove the row from the table [[TBD, 
the scheduler loop may loop here or may trigger a subprocess to loop here until 
MIN(timestamp) is not in the past]]
   
   Is removing data from deadlines table necessary?
   
   If we remove it, we have no way to show that past DagRun's have missed their 
SLA. Infact, there's no way for front-end to show any kind of Deadline breach 
alert persistently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-23 Thread via GitHub


rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2104148488


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past 
then send the callback for execution and remove the row from the table [[TBD, 
the scheduler loop may loop here or may trigger a subprocess to loop here until 
MIN(timestamp) is not in the past]]
   
   Is removing data from deadlines table necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-23 Thread via GitHub


rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2104148488


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > From my side, this may be an issue. I haven't implemented the "what 
happens when a deadline is missed"portion yet, but my intention is essentially:
   
   @ferruzzi, I believe this can be done by simply querying the deadlines table 
with the filter deadline > current_time and returning the results. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > I think "show me recent dag runs with passed deadlines" would be a key 
feature.
   
   From my side, this may be an issue.  I haven't implemented the "what happens 
when a deadline is missed"portion yet, but my intention is essentially:
   
   ===
   
   On dag run creation check if there is deadline in the DAG.  If yes then 
fetch the Reference, add the Interval, and store (the resulting timestamp, 
callback, dag_id, and run_id) in the Deadlines table
 
 
   On dagrun completion/cleanup, if there is a deadline then remove all  rows 
matching this run_id  from the table
   
   In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past 
then send the callback for execution
   
   ===
   
   Which means that as I have this designed, it is inherently destructive.  The 
rows are removed when they are no longer needed, so if you want that data to be 
persistent, we'll need some way to do that.  It could just be a bool flag in 
the dagrun table for "deadline was missed" or maybe something else, I'm not 
sure.   Just throwing it out there in case you were counting on the Deadline 
table to be a reliable source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > I think "show me recent dag runs with passed deadlines" would be a key 
feature.
   
   From my side, this may be an issue.  I haven't implemented the "what happens 
when a deadline is missed"portion yet, but my intention is essentially:
   
   ===
   
   On dag run creation check if there is deadline in the DAG.  If yes then 
fetch or calculate the Reference, add the Interval, and store (the resulting 
timestamp, callback, callback kwargs, dag_id, and run_id) in the Deadlines table
 
 
   On dagrun completion/cleanup, if there is a deadline then remove all  rows 
matching this run_id  from the table
   
   In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past 
then send the callback for execution
   
   ===
   
   Which means that as I have this designed, it is inherently destructive.  The 
rows are removed when they are no longer needed, so if you want that data to be 
persistent, we'll need some way to do that.  It could just be a bool flag in 
the dagrun table for "deadline was missed" or maybe something else, I'm not 
sure.   Just throwing it out there in case you were counting on the Deadline 
table to be a reliable source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > I think "show me recent dag runs with passed deadlines" would be a key 
feature.
   
   From my side, this may be an issue.  I haven't implemented the "what happens 
when a deadline is missed"portion yet, but my intention is essentially:
   
   ===
   
   On dag run creation check if there is deadline in the DAG.  If yes then 
fetch or calculate the Reference, add the Interval, and store (the resulting 
timestamp, callback, callback kwargs, dag_id, and run_id) in the Deadlines table
 
 
   On dagrun completion/cleanup, if there is a deadline then remove all  rows 
matching this run_id  from the table
   
   In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past 
then send the callback for execution and remove the row from the table  [[TBD, 
the scheduler loop may loop here or may trigger a subprocess to loop here until 
MIN(timestamp) is not in the past]]
   
   ===
   
   Which means that as I have this designed, it is inherently destructive.  The 
rows are removed when they are no longer needed, so if you want that data to be 
persistent, we'll need some way to do that.  It could just be a bool flag in 
the dagrun table for "deadline was missed" or maybe something else, I'm not 
sure.   Just throwing it out there in case you were counting on the Deadline 
table to be a reliable source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > I think "show me recent dag runs with passed deadlines" would be a key 
feature.
   
   From my side, this may be an issue.  I haven't implemented the "what happens 
when a deadline is missed"portion yet, but my intention is essentially:
   
   ===
   
   On dag run creation check if there is deadline in the DAG.  If yes then 
fetch or calculate the Reference, add the Interval, and store (the resulting 
timestamp, callback, dag_id, and run_id) in the Deadlines table
 
 
   On dagrun completion/cleanup, if there is a deadline then remove all  rows 
matching this run_id  from the table
   
   In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past 
then send the callback for execution
   
   ===
   
   Which means that as I have this designed, it is inherently destructive.  The 
rows are removed when they are no longer needed, so if you want that data to be 
persistent, we'll need some way to do that.  It could just be a bool flag in 
the dagrun table for "deadline was missed" or maybe something else, I'm not 
sure.   Just throwing it out there in case you were counting on the Deadline 
table to be a reliable source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > I think "show me recent dag runs with passed deadlines" would be a key 
feature.
   
   From my side, this may be an issue.  I haven't implemented the "what happens 
when a deadline is missed"portion yet, but my intention is essentially:
   
   ===
   
   On dag run creation check if there is deadline in the DAG.  If yes then 
fetch or calculate the Reference, add the Interval, and store (the resulting 
timestamp, callback, callback kwargs, dag_id, and run_id) in the Deadlines table
 
 
   On dagrun completion/cleanup, if there is a deadline then remove all  rows 
matching this run_id  from the table
   
   In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past 
then send the callback for execution and remove the row from the table  [[TBD, 
the scheduler loop may loop here or may trigger a subprocess to loop here until 
MIN(timestamp) is not in the past]]
   
   ===
   
   Which means that the way I have this system designed, it is inherently 
destructive.  The rows are removed when they are no longer needed, so if you 
want that data to be persistent, we'll need some way to do that.  It could just 
be a bool flag in the dagrun table for "deadline was missed" or maybe something 
else, I'm not sure.   Just throwing it out there in case you were counting on 
the Deadline table to be a reliable source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > I think "show me recent dag runs with passed deadlines" would be a key 
feature.
   
   From my side, this may be an issue.  I haven't implemented the "what happens 
when a deadline is missed"portion yet, but my intention is essentially:
   
   ===
   On dag run creation check if there is deadline.  If yes then fetch the 
Reference, add the Interval, and store (the resulting timestamp, callback, 
dag_id, and run_id) in the Deadlines table
 
 
   On dagrun completion/cleanup, if there is a deadline then remove all  rows 
matching this run_id  from the table
   
   In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past 
then send the callback for execution
   ===
   
   Which means that as I have this designed, it is inherently destructive.  The 
rows are removed when they are no longer needed, so if you want that data to be 
persistent, we'll need some way to do that.  It could just be a bool flag in 
the dagrun table for "deadline was missed" or maybe something else, I'm not 
sure.   Just throwing it out there in case you were counting on the Deadline 
table to be a reliable source.



##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > I think "show me recent dag runs with passed deadlines" would be a key 
feature.
   
   From my side, this may be an issue.  I haven't implemented the "what happens 
when a deadline is missed"portion yet, but my intention is essentially:
   
   ===
   
   On dag run creation check if there is deadline.  If yes then fetch the 
Reference, add the Interval, and store (the resulting timestamp, callback, 
dag_id, and run_id) in the Deadlines table
 
 
   On dagrun completion/cleanup, if there is a deadline then remove all  rows 
matching this run_id  from the table
   
   In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past 
then send the callback for execution
   
   ===
   
   Which means that as I have this designed, it is inherently destructive.  The 
rows are removed when they are no longer needed, so if you want that data to be 
persistent, we'll need some way to do that.  It could just be a bool flag in 
the dagrun table for "deadline was missed" or maybe something else, I'm not 
sure.   Just throwing it out there in case you were counting on the Deadline 
table to be a reliable source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > I think "show me recent dag runs with passed deadlines" would be a key 
feature.
   
   From my side, this may be an issue.  I haven't implemented the "what happens 
when a deadline is missed"portion yet, but my intention is essentially:
   
   ```
   On dag run creation check if there is deadline.  If yes then fetch the 
Reference, add the Interval, and store (the resulting timestamp, callback, 
dag_id, and run_id) in the Deadlines table
 
 
   On dagrun completion/cleanup, if there is a deadline then remove all  rows 
matching this run_id  from the table
   
   In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past 
then send the callback for execution
   ```
   
   Which means that as I have this designed, it is inherently destructive.  The 
rows are removed when they are no longer needed, so if you want that data to be 
persistent, we'll need some way to do that.  It could just be a bool flag in 
the dagrun table for "deadline was missed" or maybe something else, I'm not 
sure.   Just throwing it out there in case you were counting on the Deadline 
table to be a reliable source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


ferruzzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103885362


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   >  Oh I missed this. A dag run can have multiple deadlines?
   
   To be fair, it was not part of the initial proposal but added due to 
overwhelming popular demand, as they say.  I had several users ask me to make 
sure that is possible so they can do tiered/escalated response patterns and it 
seemed like an easy enough addition so I said I would make it happen.  It may 
or may not have ever made it into the official AIP yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103593763


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > Ok and then on the DagResponse we will have the full DeadlineAlert with 
the reference, interval and callback?
   
   Yes. I'll add an issue and create a PR next.
   
   
   EDIT: I just checked the Dag model. We store deadline object in it. 
~~There's nowhere we currently save a full "DeadlineAlert" object~~ 
   
   just rechecked. deadline column in DAG table will have DeadlineAlert's
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103593763


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > Ok and then on the DagResponse we will have the full DeadlineAlert with 
the reference, interval and callback?
   
   Yes. I'll add an issue and create a PR next.
   
   
   EDIT: I just checked the Dag model. We store deadline object in it. 
~~There's nowhere we currently save a full "DeadlineAlert" object~~ 
   
   just rechecked. deadline column in DAG table will have DeadlineAlert
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103593763


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > Ok and then on the DagResponse we will have the full DeadlineAlert with 
the reference, interval and callback?
   
   Yes. I'll add an issue and create a PR next.
   
   
   EDIT: I just checked the Dag model. We store deadline object in it. There's 
nowhere we currently save a full "DeadlineAlert" object
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103593763


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   > Ok and then on the DagResponse we will have the full DeadlineAlert with 
the reference, interval and callback?
   
   Yes. I'll add an issue and create a PR next.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


bbovenzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103461274


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   I think keeping some deadline information on DagRunResponse is valuable 
because I think "show me recent dag runs with passed deadlines" would be a key 
feature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


bugraoz93 commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103417038


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   How about making this a separate endpoint and calling it for the DagRun 
retrieved? Simply passing the dag_run_id. This way, we don't increase the 
DagRunResponse too much. It already has a lot of information. We can have 
deadline routes as a separate if it is okay from the UI perspective



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


bugraoz93 commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103417038


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   How about making this a separate endpoint and calling it for the DAG 
retrieved? Simply passing the dag_run_id. This way, we don't increase the 
DagRunResponse too much. It already has a lot of information. We can have 
deadline routes as a separate if it is okay from the UI perspective



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


bbovenzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103351382


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   Ok and then on the DagResponse we will have the full DeadlineAlert with the 
reference, interval and callback?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103058148


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   Yes, here's an example that @ferruzzi provided:
   
   ```
   with DAG(
   dag_id="dag_with_deadlines",
   deadline=[
   DeadlineAlert(
   reference = DeadlineReference.DAGRUN_QUEUED_AT,
   interval=timedelta(hours=1),
   callback=warn_me_it_may_be_late
   ),
   DeadlineAlert(
   reference = DeadlineReference.DAGRUN_QUEUED_AT,
   interval=timedelta(hours=2),
   callback=definitely_late
   ),
   DeadlineAlert(
   reference = DeadlineReference.DAGRUN_QUEUED_AT,
   interval=timedelta(hours=3),
   callback=email_my_boss_that_it_wasnt_my_fault
   ),
   ]
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103058148


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   Yeah. Here's an example which @ferruzzi gave me:
   
   ```
   with DAG(
   dag_id="dag_with_deadlines",
   deadline=[
   DeadlineAlert(
   reference = DeadlineReference.DAGRUN_QUEUED_AT,
   interval=timedelta(hours=1),
   callback=warn_me_it_may_be_late
   ),
   DeadlineAlert(
   reference = DeadlineReference.DAGRUN_QUEUED_AT,
   interval=timedelta(hours=2),
   callback=definitely_late
   ),
   DeadlineAlert(
   reference = DeadlineReference.DAGRUN_QUEUED_AT,
   interval=timedelta(hours=3),
   callback=email_my_boss_that_it_wasnt_my_fault
   ),
   ]
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]

2025-05-22 Thread via GitHub


bbovenzi commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2103034881


##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
 end_date: datetime | None
 data_interval_start: datetime | None
 data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None

Review Comment:
   Oh I missed this. A dag run can have multiple deadlines?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]