This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7cec2a78ac8 Fix triggerer errors after Airflow 2 to 3 migration
(#55884)
7cec2a78ac8 is described below
commit 7cec2a78ac8f511a1da94048d4dba8863751ec87
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Oct 22 23:23:01 2025 +0100
Fix triggerer errors after Airflow 2 to 3 migration (#55884)
* Fix triggerer errors after Airflow 2 to 3 migration
When upgrading from Airflow 2, existing deferred triggers can reference
TaskInstances without a dag_version_id and DagRuns with conf=None. This
caused errors when the triggerer tried to start those triggers and when
workers consumed ti_run responses.
This change:
1. Skips starting triggers whose TaskInstance lacks dag_version_id, logging
a warning instead of erroring
2. Coerces DagRun.conf from None to {} in the ti_run response for
compatibility with Airflow 2-era data
3. Adds unit tests covering both behaviors
This prevents triggerer crashes and makes deferred tasks resume reliably
after migration.
* Remove config check as that has been addressed in a different PR
* Add comment on why we added this
* Remove null conf test
---
.../src/airflow/jobs/triggerer_job_runner.py | 8 +++++-
airflow-core/tests/unit/jobs/test_triggerer_job.py | 32 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index f5bc2d94d06..617643c3f24 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -649,7 +649,13 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
)
if new_trigger_orm.task_instance:
log_path = render_log_fname(ti=new_trigger_orm.task_instance)
-
+ if not new_trigger_orm.task_instance.dag_version_id:
+ # This is to handle 2 to 3 upgrade where TI.dag_version_id
can be none
+ log.warning(
+ "TaskInstance associated with Trigger has no
associated Dag Version, skipping the trigger",
+ ti_id=new_trigger_orm.task_instance.id,
+ )
+ continue
ser_ti = workloads.TaskInstance.model_validate(
new_trigger_orm.task_instance, from_attributes=True
)
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 1e6eb1b9d6f..14302e36a83 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1140,6 +1140,38 @@ def
test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple
assert trigger_orm2.id in trigger_ids
+def test_update_triggers_skips_when_ti_has_no_dag_version(session,
supervisor_builder, dag_maker):
+ """
+ Ensure supervisor skips creating a trigger when the linked TaskInstance
has no dag_version_id.
+ """
+ with dag_maker(dag_id="test_no_dag_version"):
+ EmptyOperator(task_id="t1")
+ dr = dag_maker.create_dagrun()
+ ti = dr.task_instances[0]
+
+ # Create a Trigger and link it to the TaskInstance
+ trigger = TimeDeltaTrigger(datetime.timedelta(days=7))
+ trigger_orm = Trigger.from_object(trigger)
+ session.add(trigger_orm)
+ session.flush()
+
+ ti.trigger_id = trigger_orm.id
+ # Explicitly remove dag_version_id
+ ti.dag_version_id = None
+ session.merge(ti)
+ session.commit()
+
+ supervisor = supervisor_builder()
+
+ # Attempt to enqueue creation of this trigger
+ supervisor.update_triggers({trigger_orm.id})
+
+ # Assert that nothing was queued for creation and no subprocess writes
happened
+ assert len(supervisor.creating_triggers) == 0
+ assert trigger_orm.id not in supervisor.running_triggers
+ supervisor.stdin.write.assert_not_called()
+
+
class TestTriggererMessageTypes:
def test_message_types_in_triggerer(self):
"""