This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e85717b8e21 AIP-86 - Add `callback_state` to deadline table (#53650)
e85717b8e21 is described below

commit e85717b8e21731b4445d10c37d9699e4f6f9d4c6
Author: Ramit Kataria <[email protected]>
AuthorDate: Thu Jul 24 14:47:41 2025 -0700

    AIP-86 - Add `callback_state` to deadline table (#53650)
    
    * AIP-86 - Add `callback_state` to deadline table
    
    Storing state of the deadline callback is needed to keep track of
    the deadlines that haven't been "marked as miss" by the scheduler yet.
    A non-empty `callback_state` means that the scheduler doesn't need to
    look at it in the next loop and it could just filter for deadlines that
    don't have a `callback_state` in the query for evaluating deadlines
    in the scheduler loop.
    
    Keeping track of the state is also helpful for monitoring, metrics and
    debugging for users.
    
    * Simplified and more robust error handling in handle_callback_event
    
    * Fix Python 3.10 compatibility issue
    
    * Combine failure state categories into 1
    
    * Remove completed TODO comment
    
    * Check for queued status in callback handler
    
    * Minor improvements
---
 airflow-core/docs/img/airflow_erd.sha256           |   2 +-
 airflow-core/docs/img/airflow_erd.svg              | 154 +++++++++++----------
 airflow-core/docs/migrations-ref.rst               |   4 +-
 .../0079_3_1_0_add_callback_state_to_deadline.py   |  56 ++++++++
 airflow-core/src/airflow/models/deadline.py        |  30 ++--
 airflow-core/src/airflow/triggers/deadline.py      |  26 +++-
 airflow-core/src/airflow/utils/db.py               |   2 +-
 airflow-core/tests/unit/models/test_deadline.py    |  39 ++++--
 airflow-core/tests/unit/triggers/test_deadline.py  |  33 +++--
 9 files changed, 238 insertions(+), 108 deletions(-)

diff --git a/airflow-core/docs/img/airflow_erd.sha256 
b/airflow-core/docs/img/airflow_erd.sha256
index e1bd6b784fc..bb0a75b55cd 100644
--- a/airflow-core/docs/img/airflow_erd.sha256
+++ b/airflow-core/docs/img/airflow_erd.sha256
@@ -1 +1 @@
-cc20f5b01cbb926eef1576cd77b840849f4ded0848c4f1a582fec6d39fc2ddeb
\ No newline at end of file
+d2e81695973bf8b6b30e1f4543627547330ef531e50be633cf589fbdf639b0e8
\ No newline at end of file
diff --git a/airflow-core/docs/img/airflow_erd.svg 
b/airflow-core/docs/img/airflow_erd.svg
index adac6e6d54f..d2ce6af2ab9 100644
--- a/airflow-core/docs/img/airflow_erd.svg
+++ b/airflow-core/docs/img/airflow_erd.svg
@@ -453,24 +453,24 @@
 <!-- asset_trigger -->
 <g id="node14" class="node">
 <title>asset_trigger</title>
-<polygon fill="none" stroke="black" points="1802,-2931.5 1802,-2959.5 
2037,-2959.5 2037,-2931.5 1802,-2931.5"/>
-<text text-anchor="start" x="1860" y="-2942.7" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">asset_trigger</text>
-<polygon fill="none" stroke="black" points="1802,-2906.5 1802,-2931.5 
2037,-2931.5 2037,-2906.5 1802,-2906.5"/>
-<text text-anchor="start" x="1807" y="-2916.3" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">asset_id</text>
-<text text-anchor="start" x="1864" y="-2916.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1869" y="-2916.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1946" y="-2916.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1802,-2881.5 1802,-2906.5 
2037,-2906.5 2037,-2881.5 1802,-2881.5"/>
-<text text-anchor="start" x="1807" y="-2891.3" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">trigger_id</text>
-<text text-anchor="start" x="1874" y="-2891.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1879" y="-2891.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1956" y="-2891.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1802,-2934.5 1802,-2962.5 
2037,-2962.5 2037,-2934.5 1802,-2934.5"/>
+<text text-anchor="start" x="1860" y="-2945.7" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">asset_trigger</text>
+<polygon fill="none" stroke="black" points="1802,-2909.5 1802,-2934.5 
2037,-2934.5 2037,-2909.5 1802,-2909.5"/>
+<text text-anchor="start" x="1807" y="-2919.3" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">asset_id</text>
+<text text-anchor="start" x="1864" y="-2919.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1869" y="-2919.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1946" y="-2919.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1802,-2884.5 1802,-2909.5 
2037,-2909.5 2037,-2884.5 1802,-2884.5"/>
+<text text-anchor="start" x="1807" y="-2894.3" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">trigger_id</text>
+<text text-anchor="start" x="1874" y="-2894.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1879" y="-2894.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1956" y="-2894.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 </g>
 <!-- asset&#45;&#45;asset_trigger -->
 <g id="edge5" class="edge">
 <title>asset&#45;&#45;asset_trigger</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M734.05,-2898.19C782.26,-2936.35 841.26,-2975.23 902,-2996.5 
1069.55,-3055.19 1123.47,-3015.5 1301,-3015.5 1301,-3015.5 1301,-3015.5 
1512,-3015.5 1610.45,-3015.5 1718.88,-2988.69 1798.9,-2963.55"/>
-<text text-anchor="start" x="1767.9" y="-2967.35" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M734.05,-2898.19C782.26,-2936.35 841.26,-2975.23 902,-2996.5 
1069.55,-3055.19 1123.47,-3015.5 1301,-3015.5 1301,-3015.5 1301,-3015.5 
1512,-3015.5 1608.48,-3015.5 1714.75,-2990.48 1794.35,-2966.54"/>
+<text text-anchor="start" x="1763.35" y="-2970.34" font-family="Times,serif" 
font-size="14.00">0..N</text>
 <text text-anchor="start" x="724.05" y="-2901.99" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- asset_active -->
@@ -692,25 +692,25 @@
 <!-- dagrun_asset_event -->
 <g id="node21" class="node">
 <title>dagrun_asset_event</title>
-<polygon fill="none" stroke="black" points="1797,-2599.5 1797,-2627.5 
2042,-2627.5 2042,-2599.5 1797,-2599.5"/>
-<text text-anchor="start" x="1830" y="-2610.7" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dagrun_asset_event</text>
-<polygon fill="none" stroke="black" points="1797,-2574.5 1797,-2599.5 
2042,-2599.5 2042,-2574.5 1797,-2574.5"/>
-<text text-anchor="start" x="1802" y="-2584.3" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">dag_run_id</text>
-<text text-anchor="start" x="1879" y="-2584.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1884" y="-2584.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1961" y="-2584.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1797,-2549.5 1797,-2574.5 
2042,-2574.5 2042,-2549.5 1797,-2549.5"/>
-<text text-anchor="start" x="1802" y="-2559.3" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">event_id</text>
-<text text-anchor="start" x="1861" y="-2559.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1866" y="-2559.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1943" y="-2559.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1797,-2614.5 1797,-2642.5 
2042,-2642.5 2042,-2614.5 1797,-2614.5"/>
+<text text-anchor="start" x="1830" y="-2625.7" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dagrun_asset_event</text>
+<polygon fill="none" stroke="black" points="1797,-2589.5 1797,-2614.5 
2042,-2614.5 2042,-2589.5 1797,-2589.5"/>
+<text text-anchor="start" x="1802" y="-2599.3" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">dag_run_id</text>
+<text text-anchor="start" x="1879" y="-2599.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1884" y="-2599.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1961" y="-2599.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1797,-2564.5 1797,-2589.5 
2042,-2589.5 2042,-2564.5 1797,-2564.5"/>
+<text text-anchor="start" x="1802" y="-2574.3" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">event_id</text>
+<text text-anchor="start" x="1861" y="-2574.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1866" y="-2574.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1943" y="-2574.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 </g>
 <!-- asset_event&#45;&#45;dagrun_asset_event -->
 <g id="edge13" class="edge">
 <title>asset_event&#45;&#45;dagrun_asset_event</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M758.26,-3289.6C893.43,-3305.82 1102.1,-3313.63 1265,-3242.5 
1573.42,-3107.83 1814.81,-2754.8 1891.96,-2631.64"/>
-<text text-anchor="start" x="1860.96" y="-2635.44" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="758.26" y="-3278.4" font-family="Times,serif" 
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M758.08,-3289.19C893.12,-3305.1 1101.69,-3312.68 1265,-3242.5 
1569.94,-3111.45 1812.65,-2767.71 1891.13,-2646.54"/>
+<text text-anchor="start" x="1860.13" y="-2650.34" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="758.08" y="-3277.99" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- trigger -->
 <g id="node22" class="node">
@@ -745,9 +745,9 @@
 <!-- trigger&#45;&#45;asset_trigger -->
 <g id="edge14" class="edge">
 <title>trigger&#45;&#45;asset_trigger</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1559.57,-2313.11C1645.94,-2459.48 1825.8,-2764.25 1892.44,-2877.18"/>
-<text text-anchor="start" x="1861.44" y="-2865.98" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1549.57" y="-2316.91" font-family="Times,serif" 
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1558.13,-2313.03C1607.1,-2398.3 1686.93,-2536.72 1757,-2655.5 
1804.02,-2735.21 1859.82,-2827.62 1891.85,-2880.46"/>
+<text text-anchor="start" x="1860.85" y="-2869.26" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1548.13" y="-2316.83" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- task_instance -->
 <g id="node23" class="node">
@@ -922,46 +922,50 @@
 <!-- deadline -->
 <g id="node24" class="node">
 <title>deadline</title>
-<polygon fill="none" stroke="black" points="1777,-2262.5 1777,-2290.5 
2061,-2290.5 2061,-2262.5 1777,-2262.5"/>
-<text text-anchor="start" x="1881" y="-2273.7" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">deadline</text>
-<polygon fill="none" stroke="black" points="1777,-2237.5 1777,-2262.5 
2061,-2262.5 2061,-2237.5 1777,-2237.5"/>
-<text text-anchor="start" x="1782" y="-2247.3" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="1795" y="-2247.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1800" y="-2247.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
-<text text-anchor="start" x="1852" y="-2247.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1777,-2212.5 1777,-2237.5 
2061,-2237.5 2061,-2212.5 1777,-2212.5"/>
-<text text-anchor="start" x="1782" y="-2222.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">callback</text>
-<text text-anchor="start" x="1839" y="-2222.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1844" y="-2222.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
-<text text-anchor="start" x="1965" y="-2222.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1777,-2187.5 1777,-2212.5 
2061,-2212.5 2061,-2187.5 1777,-2187.5"/>
-<text text-anchor="start" x="1782" y="-2197.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">callback_kwargs</text>
-<text text-anchor="start" x="1895" y="-2197.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1900" y="-2197.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
-<polygon fill="none" stroke="black" points="1777,-2162.5 1777,-2187.5 
2061,-2187.5 2061,-2162.5 1777,-2162.5"/>
-<text text-anchor="start" x="1782" y="-2172.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
-<text text-anchor="start" x="1828" y="-2172.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1833" y="-2172.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<polygon fill="none" stroke="black" points="1777,-2137.5 1777,-2162.5 
2061,-2162.5 2061,-2137.5 1777,-2137.5"/>
-<text text-anchor="start" x="1782" y="-2147.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">dagrun_id</text>
-<text text-anchor="start" x="1852" y="-2147.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1857" y="-2147.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1777,-2112.5 1777,-2137.5 
2061,-2137.5 2061,-2112.5 1777,-2112.5"/>
-<text text-anchor="start" x="1782" y="-2122.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">deadline_time</text>
-<text text-anchor="start" x="1879" y="-2122.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1884" y="-2122.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<text text-anchor="start" x="1980" y="-2122.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1777,-2087.5 1777,-2112.5 
2061,-2112.5 2061,-2087.5 1777,-2087.5"/>
-<text text-anchor="start" x="1782" y="-2097.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_id</text>
-<text text-anchor="start" x="1849" y="-2097.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1854" y="-2097.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1777,-2288.5 1777,-2316.5 
2061,-2316.5 2061,-2288.5 1777,-2288.5"/>
+<text text-anchor="start" x="1881" y="-2299.7" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">deadline</text>
+<polygon fill="none" stroke="black" points="1777,-2263.5 1777,-2288.5 
2061,-2288.5 2061,-2263.5 1777,-2263.5"/>
+<text text-anchor="start" x="1782" y="-2273.3" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="1795" y="-2273.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1800" y="-2273.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
+<text text-anchor="start" x="1852" y="-2273.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1777,-2238.5 1777,-2263.5 
2061,-2263.5 2061,-2238.5 1777,-2238.5"/>
+<text text-anchor="start" x="1782" y="-2248.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">callback</text>
+<text text-anchor="start" x="1839" y="-2248.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1844" y="-2248.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
+<text text-anchor="start" x="1965" y="-2248.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1777,-2213.5 1777,-2238.5 
2061,-2238.5 2061,-2213.5 1777,-2213.5"/>
+<text text-anchor="start" x="1782" y="-2223.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">callback_kwargs</text>
+<text text-anchor="start" x="1895" y="-2223.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1900" y="-2223.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
+<polygon fill="none" stroke="black" points="1777,-2188.5 1777,-2213.5 
2061,-2213.5 2061,-2188.5 1777,-2188.5"/>
+<text text-anchor="start" x="1782" y="-2198.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">callback_state</text>
+<text text-anchor="start" x="1880" y="-2198.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1885" y="-2198.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(20)]</text>
+<polygon fill="none" stroke="black" points="1777,-2163.5 1777,-2188.5 
2061,-2188.5 2061,-2163.5 1777,-2163.5"/>
+<text text-anchor="start" x="1782" y="-2173.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
+<text text-anchor="start" x="1828" y="-2173.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1833" y="-2173.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<polygon fill="none" stroke="black" points="1777,-2138.5 1777,-2163.5 
2061,-2163.5 2061,-2138.5 1777,-2138.5"/>
+<text text-anchor="start" x="1782" y="-2148.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">dagrun_id</text>
+<text text-anchor="start" x="1852" y="-2148.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1857" y="-2148.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1777,-2113.5 1777,-2138.5 
2061,-2138.5 2061,-2113.5 1777,-2113.5"/>
+<text text-anchor="start" x="1782" y="-2123.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">deadline_time</text>
+<text text-anchor="start" x="1879" y="-2123.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1884" y="-2123.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="1980" y="-2123.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1777,-2088.5 1777,-2113.5 
2061,-2113.5 2061,-2088.5 1777,-2088.5"/>
+<text text-anchor="start" x="1782" y="-2098.3" 
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_id</text>
+<text text-anchor="start" x="1849" y="-2098.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1854" y="-2098.3" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
 </g>
 <!-- trigger&#45;&#45;deadline -->
 <g id="edge16" class="edge">
 <title>trigger&#45;&#45;deadline</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1663.18,-2216.49C1697.67,-2212.84 1734.31,-2208.96 1768.72,-2205.31"/>
-<text text-anchor="start" x="1737.72" y="-2194.11" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1663.18" y="-2205.29" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1663.18,-2221.33C1697.67,-2218.78 1734.31,-2216.07 1768.72,-2213.53"/>
+<text text-anchor="start" x="1737.72" y="-2202.33" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1663.18" y="-2210.13" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
 </g>
 <!-- hitl_detail -->
 <g id="node41" class="node">
@@ -1668,8 +1672,8 @@
 <!-- dag&#45;&#45;deadline -->
 <g id="edge30" class="edge">
 <title>dag&#45;&#45;deadline</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M829.4,-1875.53C990.95,-1841.59 1197.83,-1811.55 1265,-1861.5 
1355.49,-1928.8 1251.33,-2032.36 1338,-2104.5 1397.45,-2153.99 1607.56,-2130.65 
1684,-2142.5 1711.54,-2146.77 1740.75,-2152.04 1768.87,-2157.49"/>
-<text text-anchor="start" x="1737.87" y="-2146.29" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M829.4,-1875.53C990.95,-1841.59 1197.83,-1811.55 1265,-1861.5 
1355.49,-1928.8 1251.33,-2032.36 1338,-2104.5 1397.45,-2153.99 1607.9,-2128.64 
1684,-2142.5 1711.64,-2147.54 1740.85,-2154.05 1768.9,-2160.91"/>
+<text text-anchor="start" x="1737.9" y="-2149.71" font-family="Times,serif" 
font-size="14.00">0..N</text>
 <text text-anchor="start" x="829.4" y="-1864.33" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
 </g>
 <!-- dag_schedule_asset_name_reference -->
@@ -2119,9 +2123,9 @@
 <!-- dag_run&#45;&#45;dagrun_asset_event -->
 <g id="edge36" class="edge">
 <title>dag_run&#45;&#45;dagrun_asset_event</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1667.01,-1539.94C1673.31,-1558.88 1679.05,-1577.81 1684,-1596.5 
1764.94,-1901.85 1661.07,-2002.53 1757,-2303.5 1786.42,-2395.82 
1848.07,-2491.58 1885.97,-2545.26"/>
-<text text-anchor="start" x="1854.97" y="-2534.06" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1667.01" y="-1528.74" font-family="Times,serif" 
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1667,-1539.65C1673.32,-1558.68 1679.06,-1577.71 1684,-1596.5 
1767.32,-1913.11 1655.49,-2018.25 1757,-2329.5 1785.83,-2417.89 
1846.46,-2508.63 1884.53,-2560.33"/>
+<text text-anchor="start" x="1853.53" y="-2549.13" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1667" y="-1528.45" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- dag_run&#45;&#45;task_instance -->
 <g id="edge37" class="edge">
@@ -2140,8 +2144,8 @@
 <!-- dag_run&#45;&#45;deadline -->
 <g id="edge39" class="edge">
 <title>dag_run&#45;&#45;deadline</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1667.08,-1388.7C1688.66,-1422.11 1707.89,-1458.02 1721,-1494.5 
1764.66,-1616.03 1694.38,-1961.56 1757,-2074.5 1760.5,-2080.81 1764.51,-2086.86 
1768.92,-2092.64"/>
-<text text-anchor="start" x="1737.92" y="-2081.44" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1667.08,-1388.7C1688.66,-1422.11 1707.89,-1458.02 1721,-1494.5 
1764.66,-1616.03 1697.31,-1959.99 1757,-2074.5 1760.47,-2081.15 
1764.45,-2087.57 1768.84,-2093.75"/>
+<text text-anchor="start" x="1737.84" y="-2082.55" font-family="Times,serif" 
font-size="14.00">0..N</text>
 <text text-anchor="start" x="1667.08" y="-1377.5" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
 </g>
 <!-- backfill_dag_run -->
diff --git a/airflow-core/docs/migrations-ref.rst 
b/airflow-core/docs/migrations-ref.rst
index 508b588ec7e..dce45f2cb0a 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | Revision ID             | Revises ID       | Airflow Version   | Description 
                                                 |
 
+=========================+==================+===================+==============================================================+
-| ``09fa89ba1710`` (head) | ``40f7c30a228b`` | ``3.1.0``         | Add 
trigger_id to deadline.                                  |
+| ``f56f68b9e02f`` (head) | ``09fa89ba1710`` | ``3.1.0``         | Add 
callback_state to deadline.                              |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``09fa89ba1710``        | ``40f7c30a228b`` | ``3.1.0``         | Add 
trigger_id to deadline.                                  |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | ``40f7c30a228b``        | ``5d3072c51bac`` | ``3.1.0``         | Add Human 
In the Loop Detail table.                          |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git 
a/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_callback_state_to_deadline.py
 
b/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_callback_state_to_deadline.py
new file mode 100644
index 00000000000..cd66ae74fe0
--- /dev/null
+++ 
b/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_callback_state_to_deadline.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add callback_state to deadline.
+
+Revision ID: f56f68b9e02f
+Revises: 09fa89ba1710
+Create Date: 2025-07-22 17:46:40.122517
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "f56f68b9e02f"
+down_revision = "09fa89ba1710"
+branch_labels = None
+depends_on = None
+airflow_version = "3.1.0"
+
+
+def upgrade():
+    """Add callback_state to deadline."""
+    with op.batch_alter_table("deadline", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("callback_state", sa.String(length=20), 
nullable=True))
+        batch_op.drop_index(batch_op.f("deadline_time_idx"))
+        batch_op.create_index(
+            "deadline_callback_state_time_idx", ["callback_state", 
"deadline_time"], unique=False
+        )
+
+
+def downgrade():
+    """Remove callback_state from deadline."""
+    with op.batch_alter_table("deadline", schema=None) as batch_op:
+        batch_op.drop_index("deadline_callback_state_time_idx")
+        batch_op.create_index(batch_op.f("deadline_time_idx"), 
["deadline_time"], unique=False)
+        batch_op.drop_column("callback_state")
diff --git a/airflow-core/src/airflow/models/deadline.py 
b/airflow-core/src/airflow/models/deadline.py
index a9e8cdabbf0..ae8b8a79f9a 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -20,6 +20,7 @@ import logging
 from abc import ABC, abstractmethod
 from dataclasses import dataclass
 from datetime import datetime, timedelta
+from enum import Enum
 from typing import TYPE_CHECKING, Any
 
 import sqlalchemy_jsonfield
@@ -33,7 +34,7 @@ from airflow._shared.timezones import timezone
 from airflow.models import Trigger
 from airflow.models.base import Base, StringID
 from airflow.settings import json
-from airflow.triggers.deadline import DeadlineCallbackTrigger
+from airflow.triggers.deadline import PAYLOAD_STATUS_KEY, 
DeadlineCallbackTrigger
 from airflow.utils.decorators import classproperty
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import provide_session
@@ -48,6 +49,14 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+class DeadlineCallbackState(str, Enum):
+    """All possible states of deadline callbacks."""
+
+    QUEUED = "queued"
+    SUCCESS = "success"
+    FAILED = "failed"
+
+
 class Deadline(Base):
     """A Deadline is a 'need-by' date which triggers a callback if the 
provided time has passed."""
 
@@ -65,6 +74,8 @@ class Deadline(Base):
     callback = Column(String(500), nullable=False)
     # Serialized kwargs to pass to the callback.
     callback_kwargs = Column(sqlalchemy_jsonfield.JSONField(json=json))
+    # The state of the deadline callback
+    callback_state = Column(String(20))
 
     dagrun = relationship("DagRun", back_populates="deadlines")
 
@@ -72,7 +83,7 @@ class Deadline(Base):
     trigger_id = Column(Integer, ForeignKey("trigger.id"), nullable=True)
     trigger = relationship("Trigger", back_populates="deadline")
 
-    __table_args__ = (Index("deadline_time_idx", deadline_time, unique=False),)
+    __table_args__ = (Index("deadline_callback_state_time_idx", 
callback_state, deadline_time, unique=False),)
 
     def __init__(
         self,
@@ -115,7 +126,6 @@ class Deadline(Base):
         NOTE: This should only be used to remove deadlines which are 
associated with
             successful DagRuns. If the deadline was missed, it will be handled 
by the
             scheduler.
-        TODO:  Create the missed_deadlines table (Ramit)
 
         :param conditions: Dictionary of conditions to evaluate against.
         :param session: Session to use.
@@ -159,7 +169,7 @@ class Deadline(Base):
         return deleted_count
 
     def handle_miss(self, session: Session):
-        """Handle a missed deadline by queueing the callback and marking the 
deadline as missed."""
+        """Handle a missed deadline by creating a trigger to run the 
callback."""
         # TODO: check to see if the callback is meant to run in triggerer or 
executor. For now, the code below assumes it's for the triggerer
         callback_trigger = DeadlineCallbackTrigger(
             callback_path=self.callback,
@@ -170,17 +180,19 @@ class Deadline(Base):
         session.add(trigger_orm)
         session.flush()
         self.trigger_id = trigger_orm.id
+        self.callback_state = DeadlineCallbackState.QUEUED
         session.add(self)
 
-        # TODO mark deadline as missed
-
     def handle_callback_event(self, event: TriggerEvent, session: Session):
-        if event.payload["status"] == "success":
-            logger.debug("Deadline callback succeeded")
+        if (status := event.payload.get(PAYLOAD_STATUS_KEY)) and status in {
+            DeadlineCallbackState.SUCCESS,
+            DeadlineCallbackState.FAILED,
+        }:
             self.trigger = None
+            self.callback_state = event.payload[PAYLOAD_STATUS_KEY]
             session.add(self)
         else:
-            logger.error("Unexpected event received: %s", event)
+            logger.error("Unexpected event received: %s", event.payload)
 
 
 class ReferenceModels:
diff --git a/airflow-core/src/airflow/triggers/deadline.py 
b/airflow-core/src/airflow/triggers/deadline.py
index c65f173baf1..d04910dd8b8 100644
--- a/airflow-core/src/airflow/triggers/deadline.py
+++ b/airflow-core/src/airflow/triggers/deadline.py
@@ -24,7 +24,10 @@ from typing import Any
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 from airflow.utils.module_loading import import_string
 
-logger = logging.getLogger(__name__)
+log = logging.getLogger(__name__)
+
+PAYLOAD_STATUS_KEY = "state"
+PAYLOAD_BODY_KEY = "body"
 
 
 class DeadlineCallbackTrigger(BaseTrigger):
@@ -42,6 +45,21 @@ class DeadlineCallbackTrigger(BaseTrigger):
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        callback = import_string(self.callback_path)
-        result = await callback(**self.callback_kwargs)
-        yield TriggerEvent({"status": "success", "result": result})
+        from airflow.models.deadline import DeadlineCallbackState  # to avoid 
cyclic imports
+
+        try:
+            callback = import_string(self.callback_path)
+            result = await callback(**self.callback_kwargs)
+            log.info("Deadline callback completed with return value: %s", 
result)
+            yield TriggerEvent({PAYLOAD_STATUS_KEY: 
DeadlineCallbackState.SUCCESS, PAYLOAD_BODY_KEY: result})
+        except Exception as e:
+            if isinstance(e, ImportError):
+                message = "Could not import deadline callback on the triggerer"
+            elif isinstance(e, TypeError) and "await" in str(e):
+                message = "Deadline callback not awaitable"
+            else:
+                message = "An error occurred while executing deadline callback"
+            log.exception("%s: %s", message, e)
+            yield TriggerEvent(
+                {PAYLOAD_STATUS_KEY: DeadlineCallbackState.FAILED, 
PAYLOAD_BODY_KEY: f"{message}: {e}"}
+            )
diff --git a/airflow-core/src/airflow/utils/db.py 
b/airflow-core/src/airflow/utils/db.py
index 39ddc42b339..d6e15757460 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -93,7 +93,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
     "2.10.3": "5f2621c13b39",
     "3.0.0": "29ce7909c52b",
     "3.0.3": "fe199e1abd77",
-    "3.1.0": "09fa89ba1710",
+    "3.1.0": "f56f68b9e02f",
 }
 
 
diff --git a/airflow-core/tests/unit/models/test_deadline.py 
b/airflow-core/tests/unit/models/test_deadline.py
index ae529f45e77..ef656f7455a 100644
--- a/airflow-core/tests/unit/models/test_deadline.py
+++ b/airflow-core/tests/unit/models/test_deadline.py
@@ -26,10 +26,11 @@ from sqlalchemy import select
 from sqlalchemy.exc import SQLAlchemyError
 
 from airflow.models import DagRun, Trigger
-from airflow.models.deadline import Deadline, ReferenceModels, _fetch_from_db
+from airflow.models.deadline import Deadline, DeadlineCallbackState, 
ReferenceModels, _fetch_from_db
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk.definitions.deadline import DeadlineReference
 from airflow.triggers.base import TriggerEvent
+from airflow.triggers.deadline import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY
 from airflow.utils.state import DagRunState
 
 from tests_common.test_utils import db
@@ -210,13 +211,31 @@ class TestDeadline:
 
     @pytest.mark.db_test
     @pytest.mark.parametrize(
-        "event, none_trigger_id_expected",
+        "event, none_trigger_expected",
         [
-            pytest.param(TriggerEvent({"status": "success"}), True, 
id="success_event"),
-            pytest.param(TriggerEvent({"status": "some status"}), False, 
id="unknown_event"),
+            pytest.param(
+                TriggerEvent(
+                    {PAYLOAD_STATUS_KEY: DeadlineCallbackState.SUCCESS, 
PAYLOAD_BODY_KEY: "test_result"}
+                ),
+                True,
+                id="success_event",
+            ),
+            pytest.param(
+                TriggerEvent(
+                    {PAYLOAD_STATUS_KEY: DeadlineCallbackState.FAILED, 
PAYLOAD_BODY_KEY: "RuntimeError"}
+                ),
+                True,
+                id="failed_event",
+            ),
+            pytest.param(
+                TriggerEvent({PAYLOAD_STATUS_KEY: 
DeadlineCallbackState.QUEUED, PAYLOAD_BODY_KEY: ""}),
+                False,
+                id="invalid_event",
+            ),
+            pytest.param(TriggerEvent({PAYLOAD_STATUS_KEY: "unknown_state"}), 
False, id="unknown_event"),
         ],
     )
-    def test_handle_callback_event(self, dagrun, session, event, 
none_trigger_id_expected):
+    def test_handle_callback_event(self, dagrun, session, event, 
none_trigger_expected):
         deadline_orm = Deadline(
             deadline_time=DEFAULT_DATE,
             callback=TEST_CALLBACK_PATH,
@@ -231,10 +250,14 @@ class TestDeadline:
 
         deadline_orm.handle_callback_event(event, session)
         session.flush()
-        if none_trigger_id_expected:
-            assert deadline_orm.trigger_id is None
+
+        assert none_trigger_expected == (deadline_orm.trigger is None)
+
+        status = event.payload[PAYLOAD_STATUS_KEY]
+        if status in set(DeadlineCallbackState):
+            assert deadline_orm.callback_state == status
         else:
-            assert deadline_orm.trigger_id is not None
+            assert deadline_orm.callback_state == DeadlineCallbackState.QUEUED
 
 
 @pytest.mark.db_test
diff --git a/airflow-core/tests/unit/triggers/test_deadline.py 
b/airflow-core/tests/unit/triggers/test_deadline.py
index e1d1d797f17..869ebffda85 100644
--- a/airflow-core/tests/unit/triggers/test_deadline.py
+++ b/airflow-core/tests/unit/triggers/test_deadline.py
@@ -21,13 +21,20 @@ from unittest import mock
 
 import pytest
 
-from airflow.triggers.deadline import DeadlineCallbackTrigger
+from airflow.models.deadline import DeadlineCallbackState
+from airflow.triggers.deadline import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY, 
DeadlineCallbackTrigger
 
 TEST_CALLBACK_PATH = "classpath.test_callback_for_deadline"
 TEST_CALLBACK_KWARGS = {"arg1": "value1"}
+TEST_TRIGGER = DeadlineCallbackTrigger(callback_path=TEST_CALLBACK_PATH, 
callback_kwargs=TEST_CALLBACK_KWARGS)
 
 
 class TestDeadlineCallbackTrigger:
+    @pytest.fixture
+    def mock_import_string(self):
+        with mock.patch("airflow.triggers.deadline.import_string") as m:
+            yield m
+
     @pytest.mark.parametrize(
         "callback_init_kwargs,expected_serialized_kwargs",
         [
@@ -49,20 +56,28 @@ class TestDeadlineCallbackTrigger:
         }
 
     @pytest.mark.asyncio
-    @mock.patch("airflow.triggers.deadline.import_string")
-    async def test_run(self, mock_import_string):
+    async def test_run_success(self, mock_import_string):
         callback_return_value = "some value"
         mock_callback = mock.AsyncMock(return_value=callback_return_value)
         mock_import_string.return_value = mock_callback
 
-        trigger = DeadlineCallbackTrigger(
-            callback_path=TEST_CALLBACK_PATH,
-            callback_kwargs=TEST_CALLBACK_KWARGS,
-        )
+        event = await TEST_TRIGGER.run().asend(None)
+
+        mock_import_string.assert_called_once_with(TEST_CALLBACK_PATH)
+        mock_callback.assert_called_once_with(**TEST_CALLBACK_KWARGS)
+
+        assert event.payload[PAYLOAD_STATUS_KEY] == 
DeadlineCallbackState.SUCCESS
+        assert event.payload[PAYLOAD_BODY_KEY] == callback_return_value
+
+    @pytest.mark.asyncio
+    async def test_run_failure(self, mock_import_string):
+        mock_callback = mock.AsyncMock(side_effect=RuntimeError("Something 
went wrong"))
+        mock_import_string.return_value = mock_callback
 
-        event = await trigger.run().asend(None)
+        event = await TEST_TRIGGER.run().asend(None)
 
         mock_import_string.assert_called_once_with(TEST_CALLBACK_PATH)
         mock_callback.assert_called_once_with(**TEST_CALLBACK_KWARGS)
 
-        assert event.payload == {"status": "success", "result": 
callback_return_value}
+        assert event.payload[PAYLOAD_STATUS_KEY] == 
DeadlineCallbackState.FAILED
+        assert PAYLOAD_BODY_KEY in event.payload


Reply via email to