dondaum opened a new pull request, #66702:
URL: https://github.com/apache/airflow/pull/66702

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   This enables the use of Callables from the same Dag module for deadline 
alert callbacks. Airflow's DAG serialization adjusts the name of the Dag module 
during parsing to ensure its 
[uniqueness](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/utils/file.py#L184).
 Consequently, once the callable is part of the same module, the module path 
can no longer be used.
   
   Currently, this works only if you define the Callable in a separate module 
and subsequently import it into the Dag module—as described 
[here](https://airflow.apache.org/docs/apache-airflow/stable/howto/deadline-alerts.html#using-callbacks).
   
   
   How to reproduce
   ```Python
   # sync_deadline_test.py
   from datetime import datetime, timedelta
   from airflow.sdk import SyncCallback, DAG, DeadlineAlert, DeadlineReference, 
task
   from airflow.providers.standard.operators.empty import EmptyOperator
   
   
   def run_sync_callback(text: str):
       print(text)
   
   
   with DAG(
       dag_id="custom_callback_deadline_alert_sync",
       deadline=DeadlineAlert(
           reference=DeadlineReference.DAGRUN_QUEUED_AT,
           interval=timedelta(seconds=10),
           callback=SyncCallback(
               run_sync_callback,
               kwargs={
                   "text": "🚨 Dag {{ dag_run.dag_id }} missed deadline at {{ 
deadline.deadline_time }}. DagRun: {{ dag_run }}"
               },
           ),
       ),
   ):
   
       c = EmptyOperator(task_id="example_task")
   
       @task()
       def wait():
           import time
           time.sleep(60)
   
       
       c >> wait()
   ```
   
   Logs
   ```shell
   2026-05-05 10:22:17.561532+00:00 [error    ] Callback execution failed      
[callback_runner] callback_kwargs={'text': '🚨 Dag {{ dag_run.dag_id }} missed 
deadline at {{ deadline.deadline_time }}. DagRun: {{ dag_run }}', 'context': 
{'dag_run': {'dag_run_id': 'manual__2026-05-05T10:22:06.225020+00:00', 
'dag_id': 'custom_callback_deadline_alert_sync', 'logical_date': 
'2026-05-05T10:22:06Z', 'queued_at': '2026-05-05T10:22:06.233422Z', 
'start_date': '2026-05-05T10:22:06.267176Z', 'end_date': None, 'duration': 
None, 'data_interval_start': '2026-05-05T10:22:06Z', 'data_interval_end': 
'2026-05-05T10:22:06Z', 'run_after': '2026-05-05T10:22:06.225020Z', 
'last_scheduling_decision': '2026-05-05T10:22:16.977390Z', 'run_type': 
'manual', 'state': 'running', 'triggered_by': 'ui', 'triggering_user_name': 
'admin', 'conf': {}, 'note': None, 'dag_versions': [{'id': 
'019df7a8-1f1a-771b-bdab-ed3e0499252d', 'version_number': 3, 'dag_id': 
'custom_callback_deadline_alert_sync', 'bundle_name': 'dags-fol
 der', 'bundle_version': None, 'created_at': '2026-05-05T10:21:23.610414Z', 
'dag_display_name': 'custom_callback_deadline_alert_sync', 'bundle_url': 
None}], 'bundle_version': None, 'dag_display_name': 
'custom_callback_deadline_alert_sync', 'partition_key': None}, 'deadline': 
{'id': '019df7a8-c5ac-787b-938b-4ab1a1b1d669', 'deadline_time': 
'2026-05-05T10:22:16.233422Z'}}} 
callback_path=unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test.run_sync_callback
 error_detail=[{'exc_type': 'ModuleNotFoundError', 'exc_value': "No module 
named 
'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test'", 
'exc_notes': [], 'syntax_error': None, 'is_cause': False, 'frames': 
[{'filename': 
'/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py', 
'lineno': 111, 'name': 'execute_callback'}, {'filename': 
'/usr/python/lib/python3.10/importlib/__init__.py', 'lineno': 126, 'name': 
'import_module'}, {'filename': '<frozen importlib._bootstrap>', 'l
 ineno': 1050, 'name': '_gcd_import'}, {'filename': '<frozen 
importlib._bootstrap>', 'lineno': 1027, 'name': '_find_and_load'}, {'filename': 
'<frozen importlib._bootstrap>', 'lineno': 1004, 'name': 
'_find_and_load_unlocked'}], 'is_group': False, 'exceptions': []}] 
error_msg="Callback execution failed: ModuleNotFoundError: No module named 
'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test'" 
loc=callback_supervisor.py:142
   2026-05-05 10:22:17.562123+00:00 [error    ] Callback failed                
[callback_runner] error="Callback execution failed: ModuleNotFoundError: No 
module named 
'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test'" 
loc=callback_supervisor.py:218
   2026-05-05T10:22:17.566313Z [info     ] Workload finished              
[callback_supervisor] duration=0.06016514600014489 exit_code=1 
loc=callback_supervisor.py:368 workload_id=019df7a8-c5ab-7541-9d7f-c0ba0018c132 
workload_type=ExecutorCallback
   2026-05-05T10:22:17.566601Z [error    ] Workload execution failed.     
[airflow.executors.local_executor.LocalExecutor] loc=local_executor.py:110 
workload_type=ExecuteCallback
   Traceback (most recent call last):
     File "/opt/airflow/airflow-core/src/airflow/executors/local_executor.py", 
line 102, in _run_worker
       BaseExecutor.run_workload(
     File "/opt/airflow/airflow-core/src/airflow/executors/base_executor.py", 
line 670, in run_workload
       return supervise_callback(
     File 
"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py", 
line 376, in supervise_callback
       raise RuntimeError(f"Callback subprocess exited with code {exit_code}")
   RuntimeError: Callback subprocess exited with code 1
   2026-05-05T10:22:18.504254Z [info     ] Received executor event with state 
failed for callback 019df7a8-c5ab-7541-9d7f-c0ba0018c132 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1271
   2026-05-05T10:22:18.506357Z [error    ] Callback 
019df7a8-c5ab-7541-9d7f-c0ba0018c132 failed: Execution failed 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1296
   2026-05-05T10:22:18.512786Z [info     ] Getting all Callbacks          
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1118
   2026-05-05T10:22:18.513512Z [info     ] executor                       
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1122
   2026-05-05T10:22:18.513625Z [info     ] failed                         
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1123
   2026-05-05T10:22:18.513704Z [info     ] 1                              
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1124
   2026-05-05T10:22:18.513773Z [info     ] {'path': 
'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test.run_sync_callback',
 'dag_id': 'custom_callback_deadline_alert_sync', 'kwargs': {'text': '🚨 Dag {{ 
dag_run.dag_id }} missed deadline at {{ deadline.deadline_time }}. DagRun: {{ 
dag_run }}'}, 'prefix': 'deadline_alerts', 'executor': None} 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1125
   2026-05-05T10:22:18.513858Z [info     ] Getting all Callbacks          
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1126
   2026-05-05T10:22:18.513924Z [info     ] No pending callb
   ```
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [ ] Yes (please specify the tool below)
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to