This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e85717b8e21 AIP-86 - Add `callback_state` to deadline table (#53650)
e85717b8e21 is described below
commit e85717b8e21731b4445d10c37d9699e4f6f9d4c6
Author: Ramit Kataria <[email protected]>
AuthorDate: Thu Jul 24 14:47:41 2025 -0700
AIP-86 - Add `callback_state` to deadline table (#53650)
* AIP-86 - Add `callback_state` to deadline table
Storing state of the deadline callback is needed to keep track of
the deadlines that haven't been "marked as miss" by the scheduler yet.
A non-empty `callback_state` means that the scheduler doesn't need to
look at it in the next loop and it could just filter for deadlines that
don't have a `callback_state` in the query for evaluating deadlines
in the scheduler loop.
Keeping track of the state is also helpful for monitoring, metrics and
debugging for users.
* Simplified and more robust error handling in handle_callback_event
* Fix Python 3.10 compatibility issue
* Combine failure state categories into 1
* Remove completed TODO comment
* Check for queued status in callback handler
* Minor improvements
---
airflow-core/docs/img/airflow_erd.sha256 | 2 +-
airflow-core/docs/img/airflow_erd.svg | 154 +++++++++++----------
airflow-core/docs/migrations-ref.rst | 4 +-
.../0079_3_1_0_add_callback_state_to_deadline.py | 56 ++++++++
airflow-core/src/airflow/models/deadline.py | 30 ++--
airflow-core/src/airflow/triggers/deadline.py | 26 +++-
airflow-core/src/airflow/utils/db.py | 2 +-
airflow-core/tests/unit/models/test_deadline.py | 39 ++++--
airflow-core/tests/unit/triggers/test_deadline.py | 33 +++--
9 files changed, 238 insertions(+), 108 deletions(-)
diff --git a/airflow-core/docs/img/airflow_erd.sha256
b/airflow-core/docs/img/airflow_erd.sha256
index e1bd6b784fc..bb0a75b55cd 100644
--- a/airflow-core/docs/img/airflow_erd.sha256
+++ b/airflow-core/docs/img/airflow_erd.sha256
@@ -1 +1 @@
-cc20f5b01cbb926eef1576cd77b840849f4ded0848c4f1a582fec6d39fc2ddeb
\ No newline at end of file
+d2e81695973bf8b6b30e1f4543627547330ef531e50be633cf589fbdf639b0e8
\ No newline at end of file
diff --git a/airflow-core/docs/img/airflow_erd.svg
b/airflow-core/docs/img/airflow_erd.svg
index adac6e6d54f..d2ce6af2ab9 100644
--- a/airflow-core/docs/img/airflow_erd.svg
+++ b/airflow-core/docs/img/airflow_erd.svg
@@ -453,24 +453,24 @@
<!-- asset_trigger -->
<g id="node14" class="node">
<title>asset_trigger</title>
-<polygon fill="none" stroke="black" points="1802,-2931.5 1802,-2959.5
2037,-2959.5 2037,-2931.5 1802,-2931.5"/>
-<text text-anchor="start" x="1860" y="-2942.7"
font-family="Helvetica,sans-Serif" font-weight="bold"
font-size="16.00">asset_trigger</text>
-<polygon fill="none" stroke="black" points="1802,-2906.5 1802,-2931.5
2037,-2931.5 2037,-2906.5 1802,-2906.5"/>
-<text text-anchor="start" x="1807" y="-2916.3"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">asset_id</text>
-<text text-anchor="start" x="1864" y="-2916.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1869" y="-2916.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1946" y="-2916.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1802,-2881.5 1802,-2906.5
2037,-2906.5 2037,-2881.5 1802,-2881.5"/>
-<text text-anchor="start" x="1807" y="-2891.3"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">trigger_id</text>
-<text text-anchor="start" x="1874" y="-2891.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1879" y="-2891.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1956" y="-2891.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1802,-2934.5 1802,-2962.5
2037,-2962.5 2037,-2934.5 1802,-2934.5"/>
+<text text-anchor="start" x="1860" y="-2945.7"
font-family="Helvetica,sans-Serif" font-weight="bold"
font-size="16.00">asset_trigger</text>
+<polygon fill="none" stroke="black" points="1802,-2909.5 1802,-2934.5
2037,-2934.5 2037,-2909.5 1802,-2909.5"/>
+<text text-anchor="start" x="1807" y="-2919.3"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">asset_id</text>
+<text text-anchor="start" x="1864" y="-2919.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1869" y="-2919.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1946" y="-2919.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1802,-2884.5 1802,-2909.5
2037,-2909.5 2037,-2884.5 1802,-2884.5"/>
+<text text-anchor="start" x="1807" y="-2894.3"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">trigger_id</text>
+<text text-anchor="start" x="1874" y="-2894.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1879" y="-2894.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1956" y="-2894.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
</g>
<!-- asset--asset_trigger -->
<g id="edge5" class="edge">
<title>asset--asset_trigger</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M734.05,-2898.19C782.26,-2936.35 841.26,-2975.23 902,-2996.5
1069.55,-3055.19 1123.47,-3015.5 1301,-3015.5 1301,-3015.5 1301,-3015.5
1512,-3015.5 1610.45,-3015.5 1718.88,-2988.69 1798.9,-2963.55"/>
-<text text-anchor="start" x="1767.9" y="-2967.35" font-family="Times,serif"
font-size="14.00">0..N</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M734.05,-2898.19C782.26,-2936.35 841.26,-2975.23 902,-2996.5
1069.55,-3055.19 1123.47,-3015.5 1301,-3015.5 1301,-3015.5 1301,-3015.5
1512,-3015.5 1608.48,-3015.5 1714.75,-2990.48 1794.35,-2966.54"/>
+<text text-anchor="start" x="1763.35" y="-2970.34" font-family="Times,serif"
font-size="14.00">0..N</text>
<text text-anchor="start" x="724.05" y="-2901.99" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- asset_active -->
@@ -692,25 +692,25 @@
<!-- dagrun_asset_event -->
<g id="node21" class="node">
<title>dagrun_asset_event</title>
-<polygon fill="none" stroke="black" points="1797,-2599.5 1797,-2627.5
2042,-2627.5 2042,-2599.5 1797,-2599.5"/>
-<text text-anchor="start" x="1830" y="-2610.7"
font-family="Helvetica,sans-Serif" font-weight="bold"
font-size="16.00">dagrun_asset_event</text>
-<polygon fill="none" stroke="black" points="1797,-2574.5 1797,-2599.5
2042,-2599.5 2042,-2574.5 1797,-2574.5"/>
-<text text-anchor="start" x="1802" y="-2584.3"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">dag_run_id</text>
-<text text-anchor="start" x="1879" y="-2584.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1884" y="-2584.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1961" y="-2584.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1797,-2549.5 1797,-2574.5
2042,-2574.5 2042,-2549.5 1797,-2549.5"/>
-<text text-anchor="start" x="1802" y="-2559.3"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">event_id</text>
-<text text-anchor="start" x="1861" y="-2559.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1866" y="-2559.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1943" y="-2559.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1797,-2614.5 1797,-2642.5
2042,-2642.5 2042,-2614.5 1797,-2614.5"/>
+<text text-anchor="start" x="1830" y="-2625.7"
font-family="Helvetica,sans-Serif" font-weight="bold"
font-size="16.00">dagrun_asset_event</text>
+<polygon fill="none" stroke="black" points="1797,-2589.5 1797,-2614.5
2042,-2614.5 2042,-2589.5 1797,-2589.5"/>
+<text text-anchor="start" x="1802" y="-2599.3"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">dag_run_id</text>
+<text text-anchor="start" x="1879" y="-2599.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1884" y="-2599.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1961" y="-2599.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1797,-2564.5 1797,-2589.5
2042,-2589.5 2042,-2564.5 1797,-2564.5"/>
+<text text-anchor="start" x="1802" y="-2574.3"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">event_id</text>
+<text text-anchor="start" x="1861" y="-2574.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1866" y="-2574.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1943" y="-2574.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
</g>
<!-- asset_event--dagrun_asset_event -->
<g id="edge13" class="edge">
<title>asset_event--dagrun_asset_event</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M758.26,-3289.6C893.43,-3305.82 1102.1,-3313.63 1265,-3242.5
1573.42,-3107.83 1814.81,-2754.8 1891.96,-2631.64"/>
-<text text-anchor="start" x="1860.96" y="-2635.44" font-family="Times,serif"
font-size="14.00">0..N</text>
-<text text-anchor="start" x="758.26" y="-3278.4" font-family="Times,serif"
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M758.08,-3289.19C893.12,-3305.1 1101.69,-3312.68 1265,-3242.5
1569.94,-3111.45 1812.65,-2767.71 1891.13,-2646.54"/>
+<text text-anchor="start" x="1860.13" y="-2650.34" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="758.08" y="-3277.99" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- trigger -->
<g id="node22" class="node">
@@ -745,9 +745,9 @@
<!-- trigger--asset_trigger -->
<g id="edge14" class="edge">
<title>trigger--asset_trigger</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1559.57,-2313.11C1645.94,-2459.48 1825.8,-2764.25 1892.44,-2877.18"/>
-<text text-anchor="start" x="1861.44" y="-2865.98" font-family="Times,serif"
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1549.57" y="-2316.91" font-family="Times,serif"
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1558.13,-2313.03C1607.1,-2398.3 1686.93,-2536.72 1757,-2655.5
1804.02,-2735.21 1859.82,-2827.62 1891.85,-2880.46"/>
+<text text-anchor="start" x="1860.85" y="-2869.26" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1548.13" y="-2316.83" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- task_instance -->
<g id="node23" class="node">
@@ -922,46 +922,50 @@
<!-- deadline -->
<g id="node24" class="node">
<title>deadline</title>
-<polygon fill="none" stroke="black" points="1777,-2262.5 1777,-2290.5
2061,-2290.5 2061,-2262.5 1777,-2262.5"/>
-<text text-anchor="start" x="1881" y="-2273.7"
font-family="Helvetica,sans-Serif" font-weight="bold"
font-size="16.00">deadline</text>
-<polygon fill="none" stroke="black" points="1777,-2237.5 1777,-2262.5
2061,-2262.5 2061,-2237.5 1777,-2237.5"/>
-<text text-anchor="start" x="1782" y="-2247.3"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">id</text>
-<text text-anchor="start" x="1795" y="-2247.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1800" y="-2247.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
-<text text-anchor="start" x="1852" y="-2247.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1777,-2212.5 1777,-2237.5
2061,-2237.5 2061,-2212.5 1777,-2212.5"/>
-<text text-anchor="start" x="1782" y="-2222.3"
font-family="Helvetica,sans-Serif" font-size="14.00">callback</text>
-<text text-anchor="start" x="1839" y="-2222.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1844" y="-2222.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
-<text text-anchor="start" x="1965" y="-2222.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1777,-2187.5 1777,-2212.5
2061,-2212.5 2061,-2187.5 1777,-2187.5"/>
-<text text-anchor="start" x="1782" y="-2197.3"
font-family="Helvetica,sans-Serif" font-size="14.00">callback_kwargs</text>
-<text text-anchor="start" x="1895" y="-2197.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1900" y="-2197.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
-<polygon fill="none" stroke="black" points="1777,-2162.5 1777,-2187.5
2061,-2187.5 2061,-2162.5 1777,-2162.5"/>
-<text text-anchor="start" x="1782" y="-2172.3"
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
-<text text-anchor="start" x="1828" y="-2172.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1833" y="-2172.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<polygon fill="none" stroke="black" points="1777,-2137.5 1777,-2162.5
2061,-2162.5 2061,-2137.5 1777,-2137.5"/>
-<text text-anchor="start" x="1782" y="-2147.3"
font-family="Helvetica,sans-Serif" font-size="14.00">dagrun_id</text>
-<text text-anchor="start" x="1852" y="-2147.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1857" y="-2147.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1777,-2112.5 1777,-2137.5
2061,-2137.5 2061,-2112.5 1777,-2112.5"/>
-<text text-anchor="start" x="1782" y="-2122.3"
font-family="Helvetica,sans-Serif" font-size="14.00">deadline_time</text>
-<text text-anchor="start" x="1879" y="-2122.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1884" y="-2122.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<text text-anchor="start" x="1980" y="-2122.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1777,-2087.5 1777,-2112.5
2061,-2112.5 2061,-2087.5 1777,-2087.5"/>
-<text text-anchor="start" x="1782" y="-2097.3"
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_id</text>
-<text text-anchor="start" x="1849" y="-2097.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1854" y="-2097.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1777,-2288.5 1777,-2316.5
2061,-2316.5 2061,-2288.5 1777,-2288.5"/>
+<text text-anchor="start" x="1881" y="-2299.7"
font-family="Helvetica,sans-Serif" font-weight="bold"
font-size="16.00">deadline</text>
+<polygon fill="none" stroke="black" points="1777,-2263.5 1777,-2288.5
2061,-2288.5 2061,-2263.5 1777,-2263.5"/>
+<text text-anchor="start" x="1782" y="-2273.3"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">id</text>
+<text text-anchor="start" x="1795" y="-2273.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1800" y="-2273.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
+<text text-anchor="start" x="1852" y="-2273.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1777,-2238.5 1777,-2263.5
2061,-2263.5 2061,-2238.5 1777,-2238.5"/>
+<text text-anchor="start" x="1782" y="-2248.3"
font-family="Helvetica,sans-Serif" font-size="14.00">callback</text>
+<text text-anchor="start" x="1839" y="-2248.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1844" y="-2248.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
+<text text-anchor="start" x="1965" y="-2248.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1777,-2213.5 1777,-2238.5
2061,-2238.5 2061,-2213.5 1777,-2213.5"/>
+<text text-anchor="start" x="1782" y="-2223.3"
font-family="Helvetica,sans-Serif" font-size="14.00">callback_kwargs</text>
+<text text-anchor="start" x="1895" y="-2223.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1900" y="-2223.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
+<polygon fill="none" stroke="black" points="1777,-2188.5 1777,-2213.5
2061,-2213.5 2061,-2188.5 1777,-2188.5"/>
+<text text-anchor="start" x="1782" y="-2198.3"
font-family="Helvetica,sans-Serif" font-size="14.00">callback_state</text>
+<text text-anchor="start" x="1880" y="-2198.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1885" y="-2198.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(20)]</text>
+<polygon fill="none" stroke="black" points="1777,-2163.5 1777,-2188.5
2061,-2188.5 2061,-2163.5 1777,-2163.5"/>
+<text text-anchor="start" x="1782" y="-2173.3"
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
+<text text-anchor="start" x="1828" y="-2173.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1833" y="-2173.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<polygon fill="none" stroke="black" points="1777,-2138.5 1777,-2163.5
2061,-2163.5 2061,-2138.5 1777,-2138.5"/>
+<text text-anchor="start" x="1782" y="-2148.3"
font-family="Helvetica,sans-Serif" font-size="14.00">dagrun_id</text>
+<text text-anchor="start" x="1852" y="-2148.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1857" y="-2148.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1777,-2113.5 1777,-2138.5
2061,-2138.5 2061,-2113.5 1777,-2113.5"/>
+<text text-anchor="start" x="1782" y="-2123.3"
font-family="Helvetica,sans-Serif" font-size="14.00">deadline_time</text>
+<text text-anchor="start" x="1879" y="-2123.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1884" y="-2123.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="1980" y="-2123.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1777,-2088.5 1777,-2113.5
2061,-2113.5 2061,-2088.5 1777,-2088.5"/>
+<text text-anchor="start" x="1782" y="-2098.3"
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_id</text>
+<text text-anchor="start" x="1849" y="-2098.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1854" y="-2098.3"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
</g>
<!-- trigger--deadline -->
<g id="edge16" class="edge">
<title>trigger--deadline</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1663.18,-2216.49C1697.67,-2212.84 1734.31,-2208.96 1768.72,-2205.31"/>
-<text text-anchor="start" x="1737.72" y="-2194.11" font-family="Times,serif"
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1663.18" y="-2205.29" font-family="Times,serif"
font-size="14.00">{0,1}</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1663.18,-2221.33C1697.67,-2218.78 1734.31,-2216.07 1768.72,-2213.53"/>
+<text text-anchor="start" x="1737.72" y="-2202.33" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1663.18" y="-2210.13" font-family="Times,serif"
font-size="14.00">{0,1}</text>
</g>
<!-- hitl_detail -->
<g id="node41" class="node">
@@ -1668,8 +1672,8 @@
<!-- dag--deadline -->
<g id="edge30" class="edge">
<title>dag--deadline</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M829.4,-1875.53C990.95,-1841.59 1197.83,-1811.55 1265,-1861.5
1355.49,-1928.8 1251.33,-2032.36 1338,-2104.5 1397.45,-2153.99 1607.56,-2130.65
1684,-2142.5 1711.54,-2146.77 1740.75,-2152.04 1768.87,-2157.49"/>
-<text text-anchor="start" x="1737.87" y="-2146.29" font-family="Times,serif"
font-size="14.00">0..N</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M829.4,-1875.53C990.95,-1841.59 1197.83,-1811.55 1265,-1861.5
1355.49,-1928.8 1251.33,-2032.36 1338,-2104.5 1397.45,-2153.99 1607.9,-2128.64
1684,-2142.5 1711.64,-2147.54 1740.85,-2154.05 1768.9,-2160.91"/>
+<text text-anchor="start" x="1737.9" y="-2149.71" font-family="Times,serif"
font-size="14.00">0..N</text>
<text text-anchor="start" x="829.4" y="-1864.33" font-family="Times,serif"
font-size="14.00">{0,1}</text>
</g>
<!-- dag_schedule_asset_name_reference -->
@@ -2119,9 +2123,9 @@
<!-- dag_run--dagrun_asset_event -->
<g id="edge36" class="edge">
<title>dag_run--dagrun_asset_event</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1667.01,-1539.94C1673.31,-1558.88 1679.05,-1577.81 1684,-1596.5
1764.94,-1901.85 1661.07,-2002.53 1757,-2303.5 1786.42,-2395.82
1848.07,-2491.58 1885.97,-2545.26"/>
-<text text-anchor="start" x="1854.97" y="-2534.06" font-family="Times,serif"
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1667.01" y="-1528.74" font-family="Times,serif"
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1667,-1539.65C1673.32,-1558.68 1679.06,-1577.71 1684,-1596.5
1767.32,-1913.11 1655.49,-2018.25 1757,-2329.5 1785.83,-2417.89
1846.46,-2508.63 1884.53,-2560.33"/>
+<text text-anchor="start" x="1853.53" y="-2549.13" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1667" y="-1528.45" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- dag_run--task_instance -->
<g id="edge37" class="edge">
@@ -2140,8 +2144,8 @@
<!-- dag_run--deadline -->
<g id="edge39" class="edge">
<title>dag_run--deadline</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1667.08,-1388.7C1688.66,-1422.11 1707.89,-1458.02 1721,-1494.5
1764.66,-1616.03 1694.38,-1961.56 1757,-2074.5 1760.5,-2080.81 1764.51,-2086.86
1768.92,-2092.64"/>
-<text text-anchor="start" x="1737.92" y="-2081.44" font-family="Times,serif"
font-size="14.00">0..N</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1667.08,-1388.7C1688.66,-1422.11 1707.89,-1458.02 1721,-1494.5
1764.66,-1616.03 1697.31,-1959.99 1757,-2074.5 1760.47,-2081.15
1764.45,-2087.57 1768.84,-2093.75"/>
+<text text-anchor="start" x="1737.84" y="-2082.55" font-family="Times,serif"
font-size="14.00">0..N</text>
<text text-anchor="start" x="1667.08" y="-1377.5" font-family="Times,serif"
font-size="14.00">{0,1}</text>
</g>
<!-- backfill_dag_run -->
diff --git a/airflow-core/docs/migrations-ref.rst
b/airflow-core/docs/migrations-ref.rst
index 508b588ec7e..dce45f2cb0a 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description
|
+=========================+==================+===================+==============================================================+
-| ``09fa89ba1710`` (head) | ``40f7c30a228b`` | ``3.1.0`` | Add
trigger_id to deadline. |
+| ``f56f68b9e02f`` (head) | ``09fa89ba1710`` | ``3.1.0`` | Add
callback_state to deadline. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``09fa89ba1710`` | ``40f7c30a228b`` | ``3.1.0`` | Add
trigger_id to deadline. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``40f7c30a228b`` | ``5d3072c51bac`` | ``3.1.0`` | Add Human
In the Loop Detail table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git
a/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_callback_state_to_deadline.py
b/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_callback_state_to_deadline.py
new file mode 100644
index 00000000000..cd66ae74fe0
--- /dev/null
+++
b/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_callback_state_to_deadline.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add callback_state to deadline.
+
+Revision ID: f56f68b9e02f
+Revises: 09fa89ba1710
+Create Date: 2025-07-22 17:46:40.122517
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "f56f68b9e02f"
+down_revision = "09fa89ba1710"
+branch_labels = None
+depends_on = None
+airflow_version = "3.1.0"
+
+
+def upgrade():
+ """Add callback_state to deadline."""
+ with op.batch_alter_table("deadline", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("callback_state", sa.String(length=20),
nullable=True))
+ batch_op.drop_index(batch_op.f("deadline_time_idx"))
+ batch_op.create_index(
+ "deadline_callback_state_time_idx", ["callback_state",
"deadline_time"], unique=False
+ )
+
+
+def downgrade():
+ """Remove callback_state from deadline."""
+ with op.batch_alter_table("deadline", schema=None) as batch_op:
+ batch_op.drop_index("deadline_callback_state_time_idx")
+ batch_op.create_index(batch_op.f("deadline_time_idx"),
["deadline_time"], unique=False)
+ batch_op.drop_column("callback_state")
diff --git a/airflow-core/src/airflow/models/deadline.py
b/airflow-core/src/airflow/models/deadline.py
index a9e8cdabbf0..ae8b8a79f9a 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -20,6 +20,7 @@ import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta
+from enum import Enum
from typing import TYPE_CHECKING, Any
import sqlalchemy_jsonfield
@@ -33,7 +34,7 @@ from airflow._shared.timezones import timezone
from airflow.models import Trigger
from airflow.models.base import Base, StringID
from airflow.settings import json
-from airflow.triggers.deadline import DeadlineCallbackTrigger
+from airflow.triggers.deadline import PAYLOAD_STATUS_KEY,
DeadlineCallbackTrigger
from airflow.utils.decorators import classproperty
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
@@ -48,6 +49,14 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+class DeadlineCallbackState(str, Enum):
+ """All possible states of deadline callbacks."""
+
+ QUEUED = "queued"
+ SUCCESS = "success"
+ FAILED = "failed"
+
+
class Deadline(Base):
"""A Deadline is a 'need-by' date which triggers a callback if the
provided time has passed."""
@@ -65,6 +74,8 @@ class Deadline(Base):
callback = Column(String(500), nullable=False)
# Serialized kwargs to pass to the callback.
callback_kwargs = Column(sqlalchemy_jsonfield.JSONField(json=json))
+ # The state of the deadline callback
+ callback_state = Column(String(20))
dagrun = relationship("DagRun", back_populates="deadlines")
@@ -72,7 +83,7 @@ class Deadline(Base):
trigger_id = Column(Integer, ForeignKey("trigger.id"), nullable=True)
trigger = relationship("Trigger", back_populates="deadline")
- __table_args__ = (Index("deadline_time_idx", deadline_time, unique=False),)
+ __table_args__ = (Index("deadline_callback_state_time_idx",
callback_state, deadline_time, unique=False),)
def __init__(
self,
@@ -115,7 +126,6 @@ class Deadline(Base):
NOTE: This should only be used to remove deadlines which are
associated with
successful DagRuns. If the deadline was missed, it will be handled
by the
scheduler.
- TODO: Create the missed_deadlines table (Ramit)
:param conditions: Dictionary of conditions to evaluate against.
:param session: Session to use.
@@ -159,7 +169,7 @@ class Deadline(Base):
return deleted_count
def handle_miss(self, session: Session):
- """Handle a missed deadline by queueing the callback and marking the
deadline as missed."""
+ """Handle a missed deadline by creating a trigger to run the
callback."""
# TODO: check to see if the callback is meant to run in triggerer or
executor. For now, the code below assumes it's for the triggerer
callback_trigger = DeadlineCallbackTrigger(
callback_path=self.callback,
@@ -170,17 +180,19 @@ class Deadline(Base):
session.add(trigger_orm)
session.flush()
self.trigger_id = trigger_orm.id
+ self.callback_state = DeadlineCallbackState.QUEUED
session.add(self)
- # TODO mark deadline as missed
-
def handle_callback_event(self, event: TriggerEvent, session: Session):
- if event.payload["status"] == "success":
- logger.debug("Deadline callback succeeded")
+ if (status := event.payload.get(PAYLOAD_STATUS_KEY)) and status in {
+ DeadlineCallbackState.SUCCESS,
+ DeadlineCallbackState.FAILED,
+ }:
self.trigger = None
+ self.callback_state = event.payload[PAYLOAD_STATUS_KEY]
session.add(self)
else:
- logger.error("Unexpected event received: %s", event)
+ logger.error("Unexpected event received: %s", event.payload)
class ReferenceModels:
diff --git a/airflow-core/src/airflow/triggers/deadline.py
b/airflow-core/src/airflow/triggers/deadline.py
index c65f173baf1..d04910dd8b8 100644
--- a/airflow-core/src/airflow/triggers/deadline.py
+++ b/airflow-core/src/airflow/triggers/deadline.py
@@ -24,7 +24,10 @@ from typing import Any
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils.module_loading import import_string
-logger = logging.getLogger(__name__)
+log = logging.getLogger(__name__)
+
+PAYLOAD_STATUS_KEY = "state"
+PAYLOAD_BODY_KEY = "body"
class DeadlineCallbackTrigger(BaseTrigger):
@@ -42,6 +45,21 @@ class DeadlineCallbackTrigger(BaseTrigger):
)
async def run(self) -> AsyncIterator[TriggerEvent]:
- callback = import_string(self.callback_path)
- result = await callback(**self.callback_kwargs)
- yield TriggerEvent({"status": "success", "result": result})
+ from airflow.models.deadline import DeadlineCallbackState # to avoid
cyclic imports
+
+ try:
+ callback = import_string(self.callback_path)
+ result = await callback(**self.callback_kwargs)
+ log.info("Deadline callback completed with return value: %s",
result)
+ yield TriggerEvent({PAYLOAD_STATUS_KEY:
DeadlineCallbackState.SUCCESS, PAYLOAD_BODY_KEY: result})
+ except Exception as e:
+ if isinstance(e, ImportError):
+ message = "Could not import deadline callback on the triggerer"
+ elif isinstance(e, TypeError) and "await" in str(e):
+ message = "Deadline callback not awaitable"
+ else:
+ message = "An error occurred while executing deadline callback"
+ log.exception("%s: %s", message, e)
+ yield TriggerEvent(
+ {PAYLOAD_STATUS_KEY: DeadlineCallbackState.FAILED,
PAYLOAD_BODY_KEY: f"{message}: {e}"}
+ )
diff --git a/airflow-core/src/airflow/utils/db.py
b/airflow-core/src/airflow/utils/db.py
index 39ddc42b339..d6e15757460 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -93,7 +93,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
"2.10.3": "5f2621c13b39",
"3.0.0": "29ce7909c52b",
"3.0.3": "fe199e1abd77",
- "3.1.0": "09fa89ba1710",
+ "3.1.0": "f56f68b9e02f",
}
diff --git a/airflow-core/tests/unit/models/test_deadline.py
b/airflow-core/tests/unit/models/test_deadline.py
index ae529f45e77..ef656f7455a 100644
--- a/airflow-core/tests/unit/models/test_deadline.py
+++ b/airflow-core/tests/unit/models/test_deadline.py
@@ -26,10 +26,11 @@ from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from airflow.models import DagRun, Trigger
-from airflow.models.deadline import Deadline, ReferenceModels, _fetch_from_db
+from airflow.models.deadline import Deadline, DeadlineCallbackState,
ReferenceModels, _fetch_from_db
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.deadline import DeadlineReference
from airflow.triggers.base import TriggerEvent
+from airflow.triggers.deadline import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY
from airflow.utils.state import DagRunState
from tests_common.test_utils import db
@@ -210,13 +211,31 @@ class TestDeadline:
@pytest.mark.db_test
@pytest.mark.parametrize(
- "event, none_trigger_id_expected",
+ "event, none_trigger_expected",
[
- pytest.param(TriggerEvent({"status": "success"}), True,
id="success_event"),
- pytest.param(TriggerEvent({"status": "some status"}), False,
id="unknown_event"),
+ pytest.param(
+ TriggerEvent(
+ {PAYLOAD_STATUS_KEY: DeadlineCallbackState.SUCCESS,
PAYLOAD_BODY_KEY: "test_result"}
+ ),
+ True,
+ id="success_event",
+ ),
+ pytest.param(
+ TriggerEvent(
+ {PAYLOAD_STATUS_KEY: DeadlineCallbackState.FAILED,
PAYLOAD_BODY_KEY: "RuntimeError"}
+ ),
+ True,
+ id="failed_event",
+ ),
+ pytest.param(
+ TriggerEvent({PAYLOAD_STATUS_KEY:
DeadlineCallbackState.QUEUED, PAYLOAD_BODY_KEY: ""}),
+ False,
+ id="invalid_event",
+ ),
+ pytest.param(TriggerEvent({PAYLOAD_STATUS_KEY: "unknown_state"}),
False, id="unknown_event"),
],
)
- def test_handle_callback_event(self, dagrun, session, event,
none_trigger_id_expected):
+ def test_handle_callback_event(self, dagrun, session, event,
none_trigger_expected):
deadline_orm = Deadline(
deadline_time=DEFAULT_DATE,
callback=TEST_CALLBACK_PATH,
@@ -231,10 +250,14 @@ class TestDeadline:
deadline_orm.handle_callback_event(event, session)
session.flush()
- if none_trigger_id_expected:
- assert deadline_orm.trigger_id is None
+
+ assert none_trigger_expected == (deadline_orm.trigger is None)
+
+ status = event.payload[PAYLOAD_STATUS_KEY]
+ if status in set(DeadlineCallbackState):
+ assert deadline_orm.callback_state == status
else:
- assert deadline_orm.trigger_id is not None
+ assert deadline_orm.callback_state == DeadlineCallbackState.QUEUED
@pytest.mark.db_test
diff --git a/airflow-core/tests/unit/triggers/test_deadline.py
b/airflow-core/tests/unit/triggers/test_deadline.py
index e1d1d797f17..869ebffda85 100644
--- a/airflow-core/tests/unit/triggers/test_deadline.py
+++ b/airflow-core/tests/unit/triggers/test_deadline.py
@@ -21,13 +21,20 @@ from unittest import mock
import pytest
-from airflow.triggers.deadline import DeadlineCallbackTrigger
+from airflow.models.deadline import DeadlineCallbackState
+from airflow.triggers.deadline import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY,
DeadlineCallbackTrigger
TEST_CALLBACK_PATH = "classpath.test_callback_for_deadline"
TEST_CALLBACK_KWARGS = {"arg1": "value1"}
+TEST_TRIGGER = DeadlineCallbackTrigger(callback_path=TEST_CALLBACK_PATH,
callback_kwargs=TEST_CALLBACK_KWARGS)
class TestDeadlineCallbackTrigger:
+ @pytest.fixture
+ def mock_import_string(self):
+ with mock.patch("airflow.triggers.deadline.import_string") as m:
+ yield m
+
@pytest.mark.parametrize(
"callback_init_kwargs,expected_serialized_kwargs",
[
@@ -49,20 +56,28 @@ class TestDeadlineCallbackTrigger:
}
@pytest.mark.asyncio
- @mock.patch("airflow.triggers.deadline.import_string")
- async def test_run(self, mock_import_string):
+ async def test_run_success(self, mock_import_string):
callback_return_value = "some value"
mock_callback = mock.AsyncMock(return_value=callback_return_value)
mock_import_string.return_value = mock_callback
- trigger = DeadlineCallbackTrigger(
- callback_path=TEST_CALLBACK_PATH,
- callback_kwargs=TEST_CALLBACK_KWARGS,
- )
+ event = await TEST_TRIGGER.run().asend(None)
+
+ mock_import_string.assert_called_once_with(TEST_CALLBACK_PATH)
+ mock_callback.assert_called_once_with(**TEST_CALLBACK_KWARGS)
+
+ assert event.payload[PAYLOAD_STATUS_KEY] ==
DeadlineCallbackState.SUCCESS
+ assert event.payload[PAYLOAD_BODY_KEY] == callback_return_value
+
+ @pytest.mark.asyncio
+ async def test_run_failure(self, mock_import_string):
+ mock_callback = mock.AsyncMock(side_effect=RuntimeError("Something
went wrong"))
+ mock_import_string.return_value = mock_callback
- event = await trigger.run().asend(None)
+ event = await TEST_TRIGGER.run().asend(None)
mock_import_string.assert_called_once_with(TEST_CALLBACK_PATH)
mock_callback.assert_called_once_with(**TEST_CALLBACK_KWARGS)
- assert event.payload == {"status": "success", "result":
callback_return_value}
+ assert event.payload[PAYLOAD_STATUS_KEY] ==
DeadlineCallbackState.FAILED
+ assert PAYLOAD_BODY_KEY in event.payload