This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new b299c431631 Filter query to update the dag_run table with backfill details, using a condition on dag_id (#50577) b299c431631 is described below commit b299c431631d7cc88ce7919c8ed481e3ed3cc38d Author: Rahul Vats <43964496+vatsrahul1...@users.noreply.github.com> AuthorDate: Wed May 14 16:26:44 2025 +0530 Filter query to update the dag_run table with backfill details, using a condition on dag_id (#50577) --- airflow-core/src/airflow/models/backfill.py | 7 +++++-- airflow-core/tests/unit/models/test_backfill.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/models/backfill.py b/airflow-core/src/airflow/models/backfill.py index 7028cf12495..9514ff9e320 100644 --- a/airflow-core/src/airflow/models/backfill.py +++ b/airflow-core/src/airflow/models/backfill.py @@ -306,7 +306,10 @@ def _create_backfill_dag_run( return lock = session.execute( with_row_locks( - query=select(DagRun).where(DagRun.logical_date == info.logical_date), + query=select(DagRun).where( + DagRun.logical_date == info.logical_date, + DagRun.dag_id == dag.dag_id, + ), session=session, skip_locked=True, ) @@ -410,7 +413,7 @@ def _handle_clear_run(session, dag, dr, info, backfill_id, sort_ordinal): # Update backfill_id and run_type in DagRun table session.execute( update(DagRun) - .where(DagRun.logical_date == info.logical_date) + .where(DagRun.logical_date == info.logical_date, DagRun.dag_id == dag.dag_id) .values( backfill_id=backfill_id, run_type=DagRunType.BACKFILL_JOB, diff --git a/airflow-core/tests/unit/models/test_backfill.py b/airflow-core/tests/unit/models/test_backfill.py index 2c52c51f429..d5e86cb949b 100644 --- a/airflow-core/tests/unit/models/test_backfill.py +++ b/airflow-core/tests/unit/models/test_backfill.py @@ -284,7 +284,7 @@ def test_reprocess_behavior(reprocess_behavior, num_in_b, exc_reasons, dag_maker query = ( select(DagRun) .join(BackfillDagRun.dag_run) - .where(BackfillDagRun.backfill_id == b.id) + .where(BackfillDagRun.backfill_id == b.id, DagRun.dag_id == dag.dag_id) .order_by(BackfillDagRun.sort_ordinal) ) # these are all the dag runs that are part of this backfill