Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar closed pull request #50957: Aip 86: add deadlines to DagRunResponse URL: https://github.com/apache/airflow/pull/50957 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on PR #50957: URL: https://github.com/apache/airflow/pull/50957#issuecomment-2998795623 I haven't been getting time to finish this. I'll let someone pick and finish this off. At least the PR can act as a starting point to anyone who picks it up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on PR #50957: URL: https://github.com/apache/airflow/pull/50957#issuecomment-2981327503 > FYI, this PR was merged recently #51698 which will impact this PR. @pierrejeambrun , what kind of impact? Did you mean merge conflicts or do I need to be updating something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2146774803 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: @ferruzzi , if that's the case, should i keep this PR on hold until we add the support to have multiple deadlines? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
ferruzzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2138333416 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: I should also clarify that the current code as written is still a single deadline, but I need to make that change before launch, it WILL accept a list but the current code in the repo doesn't show that. Sorry if I mislead or confused anyone on that point. It came up in a Meetup discussion after I did the initial work, but it's a great idea and I'm going to be adding it in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
ferruzzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2138330321 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: I brought it up on the dev call last week and we're going to make (a variation on) the change you suggested. Here's what the lifespan of a Deadline will be, using Dagrun as an example: - When a new Dagrun is created: - If the dag has a deadline, calculate the value and store it along with the dag_id, run_id, etc in the `deadline` table - When a dagrun finishes: - If (and only if) the current time is before the calculated deadline, then remove the deadline from the `deadline` table - The scheduler loop will query the deadline table to see if any deadlines have expired - If yes then the callback is sent to the Triggerer to process - once the callback is run, the Triggerer will move the failed deadline to a new table (working name is just `missed_deadlines`?) So `deadlines` table will have all upcoming and unprocessed deadlines. `missed_deadlines` table will have all expired deadlines which have been resolved. If the serialized DAG has a deadline and it is in neither table, then it will be assumed that it completed successfully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2136921222
##
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py:
##
@@ -166,6 +180,13 @@ def get_dag_versions_dict(dag_versions: list[DagVersion])
-> list[dict]:
]
+def convert_instrumented_list_to_dict(lst):
+result = [obj.__dict__ for obj in lst]
+for obj in result:
+obj.pop("_sa_instance_state", None)
+return result
+
+
Review Comment:
Yes. That's an additional item on the list and was failing tests due to a
mismatch in comparison caused by `_sa_instance_state`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
pierrejeambrun commented on code in PR #50957:
URL: https://github.com/apache/airflow/pull/50957#discussion_r2120350829
##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
end_date: datetime | None
data_interval_start: datetime | None
data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None
Review Comment:
Maybe I misunderstood something. As I understand it, there's no record in
the db at all for deadlines unless the run is in progress?
I assume we can always improve later and add that if people request it.
(soft delete etc...)
##
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py:
##
@@ -166,6 +180,13 @@ def get_dag_versions_dict(dag_versions: list[DagVersion])
-> list[dict]:
]
+def convert_instrumented_list_to_dict(lst):
+result = [obj.__dict__ for obj in lst]
+for obj in result:
+obj.pop("_sa_instance_state", None)
+return result
+
+
Review Comment:
Is that needed ?
##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
end_date: datetime | None
data_interval_start: datetime | None
data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None
Review Comment:
`deadlines: list[DeadlineResponse]` should be enough, the relationship
should return an empty list when there is no related items
##
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##
@@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel):
end_date: datetime | None
data_interval_start: datetime | None
data_interval_end: datetime | None
+deadlines: list[DeadlineResponse] | None
Review Comment:
From the API point of view, we can nest the `deadline` API inside the
`dagrun` one. Because deadline can be considered a sub-resource of the dagrun,
they only exist for their corresponding dagrun, and especially when the dagrun
is in a specific state, otherwise they are removed. (But that's debatable,
since all the API resource are for now flat at root level, we could do the same
here).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
ferruzzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2107658391 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: If you want to change the plan at this point, please start a dev list discussion for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
pierrejeambrun commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2104680420 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: I think we shouldn't remove rows, maybe just mark them `archieved` via a boolean in the db. This way we can still access the history for UI or audit. Also adding a column `completed_at` to know when was the deadline completed can even be an interesting option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2104148488 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past then send the callback for execution and remove the row from the table [[TBD, the scheduler loop may loop here or may trigger a subprocess to loop here until MIN(timestamp) is not in the past]] Is removing data from deadlines table necessary? If we remove it, we have no way to show that past DagRun's have missed their SLA. Infact, there's no way for front-end to show any kind of Deadline breach alert persistently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2104148488 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past then send the callback for execution and remove the row from the table [[TBD, the scheduler loop may loop here or may trigger a subprocess to loop here until MIN(timestamp) is not in the past]] Is removing data from deadlines table necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2104148488 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > From my side, this may be an issue. I haven't implemented the "what happens when a deadline is missed"portion yet, but my intention is essentially: @ferruzzi, I believe this can be done by simply querying the deadlines table with the filter deadline > current_time and returning the results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
ferruzzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > I think "show me recent dag runs with passed deadlines" would be a key feature. From my side, this may be an issue. I haven't implemented the "what happens when a deadline is missed"portion yet, but my intention is essentially: === On dag run creation check if there is deadline in the DAG. If yes then fetch the Reference, add the Interval, and store (the resulting timestamp, callback, dag_id, and run_id) in the Deadlines table On dagrun completion/cleanup, if there is a deadline then remove all rows matching this run_id from the table In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past then send the callback for execution === Which means that as I have this designed, it is inherently destructive. The rows are removed when they are no longer needed, so if you want that data to be persistent, we'll need some way to do that. It could just be a bool flag in the dagrun table for "deadline was missed" or maybe something else, I'm not sure. Just throwing it out there in case you were counting on the Deadline table to be a reliable source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
ferruzzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > I think "show me recent dag runs with passed deadlines" would be a key feature. From my side, this may be an issue. I haven't implemented the "what happens when a deadline is missed"portion yet, but my intention is essentially: === On dag run creation check if there is deadline in the DAG. If yes then fetch or calculate the Reference, add the Interval, and store (the resulting timestamp, callback, callback kwargs, dag_id, and run_id) in the Deadlines table On dagrun completion/cleanup, if there is a deadline then remove all rows matching this run_id from the table In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past then send the callback for execution === Which means that as I have this designed, it is inherently destructive. The rows are removed when they are no longer needed, so if you want that data to be persistent, we'll need some way to do that. It could just be a bool flag in the dagrun table for "deadline was missed" or maybe something else, I'm not sure. Just throwing it out there in case you were counting on the Deadline table to be a reliable source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
ferruzzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > I think "show me recent dag runs with passed deadlines" would be a key feature. From my side, this may be an issue. I haven't implemented the "what happens when a deadline is missed"portion yet, but my intention is essentially: === On dag run creation check if there is deadline in the DAG. If yes then fetch or calculate the Reference, add the Interval, and store (the resulting timestamp, callback, callback kwargs, dag_id, and run_id) in the Deadlines table On dagrun completion/cleanup, if there is a deadline then remove all rows matching this run_id from the table In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past then send the callback for execution and remove the row from the table [[TBD, the scheduler loop may loop here or may trigger a subprocess to loop here until MIN(timestamp) is not in the past]] === Which means that as I have this designed, it is inherently destructive. The rows are removed when they are no longer needed, so if you want that data to be persistent, we'll need some way to do that. It could just be a bool flag in the dagrun table for "deadline was missed" or maybe something else, I'm not sure. Just throwing it out there in case you were counting on the Deadline table to be a reliable source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
ferruzzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > I think "show me recent dag runs with passed deadlines" would be a key feature. From my side, this may be an issue. I haven't implemented the "what happens when a deadline is missed"portion yet, but my intention is essentially: === On dag run creation check if there is deadline in the DAG. If yes then fetch or calculate the Reference, add the Interval, and store (the resulting timestamp, callback, dag_id, and run_id) in the Deadlines table On dagrun completion/cleanup, if there is a deadline then remove all rows matching this run_id from the table In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past then send the callback for execution === Which means that as I have this designed, it is inherently destructive. The rows are removed when they are no longer needed, so if you want that data to be persistent, we'll need some way to do that. It could just be a bool flag in the dagrun table for "deadline was missed" or maybe something else, I'm not sure. Just throwing it out there in case you were counting on the Deadline table to be a reliable source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
ferruzzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > I think "show me recent dag runs with passed deadlines" would be a key feature. From my side, this may be an issue. I haven't implemented the "what happens when a deadline is missed"portion yet, but my intention is essentially: === On dag run creation check if there is deadline in the DAG. If yes then fetch or calculate the Reference, add the Interval, and store (the resulting timestamp, callback, callback kwargs, dag_id, and run_id) in the Deadlines table On dagrun completion/cleanup, if there is a deadline then remove all rows matching this run_id from the table In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past then send the callback for execution and remove the row from the table [[TBD, the scheduler loop may loop here or may trigger a subprocess to loop here until MIN(timestamp) is not in the past]] === Which means that the way I have this system designed, it is inherently destructive. The rows are removed when they are no longer needed, so if you want that data to be persistent, we'll need some way to do that. It could just be a bool flag in the dagrun table for "deadline was missed" or maybe something else, I'm not sure. Just throwing it out there in case you were counting on the Deadline table to be a reliable source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
ferruzzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > I think "show me recent dag runs with passed deadlines" would be a key feature. From my side, this may be an issue. I haven't implemented the "what happens when a deadline is missed"portion yet, but my intention is essentially: === On dag run creation check if there is deadline. If yes then fetch the Reference, add the Interval, and store (the resulting timestamp, callback, dag_id, and run_id) in the Deadlines table On dagrun completion/cleanup, if there is a deadline then remove all rows matching this run_id from the table In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past then send the callback for execution === Which means that as I have this designed, it is inherently destructive. The rows are removed when they are no longer needed, so if you want that data to be persistent, we'll need some way to do that. It could just be a bool flag in the dagrun table for "deadline was missed" or maybe something else, I'm not sure. Just throwing it out there in case you were counting on the Deadline table to be a reliable source. ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > I think "show me recent dag runs with passed deadlines" would be a key feature. From my side, this may be an issue. I haven't implemented the "what happens when a deadline is missed"portion yet, but my intention is essentially: === On dag run creation check if there is deadline. If yes then fetch the Reference, add the Interval, and store (the resulting timestamp, callback, dag_id, and run_id) in the Deadlines table On dagrun completion/cleanup, if there is a deadline then remove all rows matching this run_id from the table In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past then send the callback for execution === Which means that as I have this designed, it is inherently destructive. The rows are removed when they are no longer needed, so if you want that data to be persistent, we'll need some way to do that. It could just be a bool flag in the dagrun table for "deadline was missed" or maybe something else, I'm not sure. Just throwing it out there in case you were counting on the Deadline table to be a reliable source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
ferruzzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103894496 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > I think "show me recent dag runs with passed deadlines" would be a key feature. From my side, this may be an issue. I haven't implemented the "what happens when a deadline is missed"portion yet, but my intention is essentially: ``` On dag run creation check if there is deadline. If yes then fetch the Reference, add the Interval, and store (the resulting timestamp, callback, dag_id, and run_id) in the Deadlines table On dagrun completion/cleanup, if there is a deadline then remove all rows matching this run_id from the table In the scheduler loop get Deadline.MIN(timestamp) and if it is in the past then send the callback for execution ``` Which means that as I have this designed, it is inherently destructive. The rows are removed when they are no longer needed, so if you want that data to be persistent, we'll need some way to do that. It could just be a bool flag in the dagrun table for "deadline was missed" or maybe something else, I'm not sure. Just throwing it out there in case you were counting on the Deadline table to be a reliable source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
ferruzzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103885362 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > Oh I missed this. A dag run can have multiple deadlines? To be fair, it was not part of the initial proposal but added due to overwhelming popular demand, as they say. I had several users ask me to make sure that is possible so they can do tiered/escalated response patterns and it seemed like an easy enough addition so I said I would make it happen. It may or may not have ever made it into the official AIP yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103593763 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > Ok and then on the DagResponse we will have the full DeadlineAlert with the reference, interval and callback? Yes. I'll add an issue and create a PR next. EDIT: I just checked the Dag model. We store deadline object in it. ~~There's nowhere we currently save a full "DeadlineAlert" object~~ just rechecked. deadline column in DAG table will have DeadlineAlert's -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103593763 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > Ok and then on the DagResponse we will have the full DeadlineAlert with the reference, interval and callback? Yes. I'll add an issue and create a PR next. EDIT: I just checked the Dag model. We store deadline object in it. ~~There's nowhere we currently save a full "DeadlineAlert" object~~ just rechecked. deadline column in DAG table will have DeadlineAlert -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103593763 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > Ok and then on the DagResponse we will have the full DeadlineAlert with the reference, interval and callback? Yes. I'll add an issue and create a PR next. EDIT: I just checked the Dag model. We store deadline object in it. There's nowhere we currently save a full "DeadlineAlert" object -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103593763 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: > Ok and then on the DagResponse we will have the full DeadlineAlert with the reference, interval and callback? Yes. I'll add an issue and create a PR next. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
bbovenzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103461274 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: I think keeping some deadline information on DagRunResponse is valuable because I think "show me recent dag runs with passed deadlines" would be a key feature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
bugraoz93 commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103417038 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: How about making this a separate endpoint and calling it for the DagRun retrieved? Simply passing the dag_run_id. This way, we don't increase the DagRunResponse too much. It already has a lot of information. We can have deadline routes as a separate if it is okay from the UI perspective -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
bugraoz93 commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103417038 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: How about making this a separate endpoint and calling it for the DAG retrieved? Simply passing the dag_run_id. This way, we don't increase the DagRunResponse too much. It already has a lot of information. We can have deadline routes as a separate if it is okay from the UI perspective -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
bbovenzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103351382 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: Ok and then on the DagResponse we will have the full DeadlineAlert with the reference, interval and callback? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103058148 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: Yes, here's an example that @ferruzzi provided: ``` with DAG( dag_id="dag_with_deadlines", deadline=[ DeadlineAlert( reference = DeadlineReference.DAGRUN_QUEUED_AT, interval=timedelta(hours=1), callback=warn_me_it_may_be_late ), DeadlineAlert( reference = DeadlineReference.DAGRUN_QUEUED_AT, interval=timedelta(hours=2), callback=definitely_late ), DeadlineAlert( reference = DeadlineReference.DAGRUN_QUEUED_AT, interval=timedelta(hours=3), callback=email_my_boss_that_it_wasnt_my_fault ), ] ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
rawwar commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103058148 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: Yeah. Here's an example which @ferruzzi gave me: ``` with DAG( dag_id="dag_with_deadlines", deadline=[ DeadlineAlert( reference = DeadlineReference.DAGRUN_QUEUED_AT, interval=timedelta(hours=1), callback=warn_me_it_may_be_late ), DeadlineAlert( reference = DeadlineReference.DAGRUN_QUEUED_AT, interval=timedelta(hours=2), callback=definitely_late ), DeadlineAlert( reference = DeadlineReference.DAGRUN_QUEUED_AT, interval=timedelta(hours=3), callback=email_my_boss_that_it_wasnt_my_fault ), ] ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Aip 86: add deadlines to DagRunResponse [airflow]
bbovenzi commented on code in PR #50957: URL: https://github.com/apache/airflow/pull/50957#discussion_r2103034881 ## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -68,6 +69,7 @@ class DAGRunResponse(BaseModel): end_date: datetime | None data_interval_start: datetime | None data_interval_end: datetime | None +deadlines: list[DeadlineResponse] | None Review Comment: Oh I missed this. A dag run can have multiple deadlines? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
