This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b299c431631 Filter query to update the dag_run table with backfill
details, using a condition on dag_id (#50577)
b299c431631 is described below
commit b299c431631d7cc88ce7919c8ed481e3ed3cc38d
Author: Rahul Vats <[email protected]>
AuthorDate: Wed May 14 16:26:44 2025 +0530
Filter query to update the dag_run table with backfill details, using a
condition on dag_id (#50577)
---
airflow-core/src/airflow/models/backfill.py | 7 +++++--
airflow-core/tests/unit/models/test_backfill.py | 2 +-
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/models/backfill.py
b/airflow-core/src/airflow/models/backfill.py
index 7028cf12495..9514ff9e320 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -306,7 +306,10 @@ def _create_backfill_dag_run(
return
lock = session.execute(
with_row_locks(
- query=select(DagRun).where(DagRun.logical_date ==
info.logical_date),
+ query=select(DagRun).where(
+ DagRun.logical_date == info.logical_date,
+ DagRun.dag_id == dag.dag_id,
+ ),
session=session,
skip_locked=True,
)
@@ -410,7 +413,7 @@ def _handle_clear_run(session, dag, dr, info, backfill_id,
sort_ordinal):
# Update backfill_id and run_type in DagRun table
session.execute(
update(DagRun)
- .where(DagRun.logical_date == info.logical_date)
+ .where(DagRun.logical_date == info.logical_date, DagRun.dag_id ==
dag.dag_id)
.values(
backfill_id=backfill_id,
run_type=DagRunType.BACKFILL_JOB,
diff --git a/airflow-core/tests/unit/models/test_backfill.py
b/airflow-core/tests/unit/models/test_backfill.py
index 2c52c51f429..d5e86cb949b 100644
--- a/airflow-core/tests/unit/models/test_backfill.py
+++ b/airflow-core/tests/unit/models/test_backfill.py
@@ -284,7 +284,7 @@ def test_reprocess_behavior(reprocess_behavior, num_in_b,
exc_reasons, dag_maker
query = (
select(DagRun)
.join(BackfillDagRun.dag_run)
- .where(BackfillDagRun.backfill_id == b.id)
+ .where(BackfillDagRun.backfill_id == b.id, DagRun.dag_id == dag.dag_id)
.order_by(BackfillDagRun.sort_ordinal)
)
# these are all the dag runs that are part of this backfill