Nataneljpwd commented on code in PR #61274:
URL: https://github.com/apache/airflow/pull/61274#discussion_r2778072973
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1245,22 +1252,31 @@ def recalculate(self) -> _UnfinishedStates:
self.set_state(DagRunState.SUCCESS)
self.notify_dagrun_state_changed(msg="success")
- if execute_callbacks and dag.has_on_success_callback:
- self.handle_dag_callback(dag=cast("SDKDAG", dag),
success=True, reason="success")
- elif dag.has_on_success_callback:
- callback = DagCallbackRequest(
- filepath=self.dag_model.relative_fileloc,
- dag_id=self.dag_id,
- run_id=self.run_id,
- bundle_name=self.dag_model.bundle_name,
- bundle_version=self.bundle_version,
- context_from_server=DagRunContext(
- dag_run=self,
- last_ti=self.get_last_ti(dag=dag, session=session),
- ),
- is_failure_callback=False,
- msg="success",
+ if dag.has_on_success_callback:
+ last_ti_to_run: TI | None = (
+ max(tis_for_dagrun_state, key=lambda ti: ti.end_date,
default=None)
)
+ if execute_callbacks:
Review Comment:
Same as the comment above
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1280,26 +1296,31 @@ def recalculate(self) -> _UnfinishedStates:
self.set_state(DagRunState.FAILED)
self.notify_dagrun_state_changed(msg="all_tasks_deadlocked")
- if execute_callbacks and dag.has_on_failure_callback:
- self.handle_dag_callback(
- dag=cast("SDKDAG", dag),
- success=False,
- reason="all_tasks_deadlocked",
- )
- elif dag.has_on_failure_callback:
- callback = DagCallbackRequest(
- filepath=self.dag_model.relative_fileloc,
- dag_id=self.dag_id,
- run_id=self.run_id,
- bundle_name=self.dag_model.bundle_name,
- bundle_version=self.bundle_version,
- context_from_server=DagRunContext(
- dag_run=self,
- last_ti=self.get_last_ti(dag=dag, session=session),
- ),
- is_failure_callback=True,
- msg="all_tasks_deadlocked",
+ if dag.has_on_failure_callback:
+ last_finished_ti: TI | None = (
+ max(info.finished_tis, key=lambda ti: ti.end_date,
default=None)
)
+ if execute_callbacks:
Review Comment:
Same here
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1212,22 +1212,29 @@ def recalculate(self) -> _UnfinishedStates:
self.set_state(DagRunState.FAILED)
self.notify_dagrun_state_changed(msg="task_failure")
- if execute_callbacks and dag.has_on_failure_callback:
- self.handle_dag_callback(dag=cast("SDKDAG", dag),
success=False, reason="task_failure")
- elif dag.has_on_failure_callback:
- callback = DagCallbackRequest(
- filepath=self.dag_model.relative_fileloc,
- dag_id=self.dag_id,
- run_id=self.run_id,
- bundle_name=self.dag_model.bundle_name,
- bundle_version=self.bundle_version,
- context_from_server=DagRunContext(
- dag_run=self,
- last_ti=self.get_last_ti(dag=dag, session=session),
- ),
- is_failure_callback=True,
- msg="task_failure",
- )
+ if dag.has_on_failure_callback:
+ ti_causing_failure =
self.get_first_ti_causing_failure(dag=dag, session=session)
+ if execute_callbacks:
Review Comment:
Why isn't this checked in the upper if statement? It can be less nested
while remaining readable, the else just becomes an else if for the upper if
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]