kaxil commented on code in PR #55238:
URL: https://github.com/apache/airflow/pull/55238#discussion_r2321783034
##########
airflow-core/src/airflow/dag_processing/processor.py:
##########
@@ -335,6 +338,74 @@ def get_callback_representation(callback):
log.exception("Error in callback at index %d: %s", idx,
callback_repr)
+def _execute_email_callbacks(
+ dagbag: DagBag, request: EmailNotificationRequest, log:
FilteringBoundLogger
+) -> None:
+ """Execute email notification for task failure/retry."""
+ dag = dagbag.dags[request.ti.dag_id]
+ task = dag.get_task(request.ti.task_id)
+
+ if not task.email:
+ log.warning(
+ "Email callback requested but no email configured",
+ dag_id=request.ti.dag_id,
+ task_id=request.ti.task_id,
+ run_id=request.ti.run_id,
+ )
+ return
+
+ # Check if email should be sent based on task configuration
+ should_send_email = False
+ if request.email_type == "failure" and task.email_on_failure:
+ should_send_email = True
+ elif request.email_type == "retry" and task.email_on_retry:
+ should_send_email = True
+
+ if not should_send_email:
+ log.info(
+ "Email not sent - task configured with email_on_%s=False",
+ request.email_type,
+ dag_id=request.ti.dag_id,
+ task_id=request.ti.task_id,
+ run_id=request.ti.run_id,
+ )
+ return
+
+ ctx_from_server = request.context_from_server
+
+ if ctx_from_server is not None:
Review Comment:
Not anymore, fixed it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]