This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 37c43ddd997 Fix backfill marked complete before DagRuns are created 
(#62561)
37c43ddd997 is described below

commit 37c43ddd99754b9e512a93ad0b35ced548b260c9
Author: Shivam Rastogi <[email protected]>
AuthorDate: Thu Apr 2 02:15:32 2026 -0700

    Fix backfill marked complete before DagRuns are created (#62561)
---
 airflow-core/newsfragments/62561.bugfix.rst        |   1 +
 .../src/airflow/jobs/scheduler_job_runner.py       |  11 +-
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 259 ++++++++++++++++++++-
 3 files changed, 269 insertions(+), 2 deletions(-)

diff --git a/airflow-core/newsfragments/62561.bugfix.rst 
b/airflow-core/newsfragments/62561.bugfix.rst
new file mode 100644
index 00000000000..9b40b89e211
--- /dev/null
+++ b/airflow-core/newsfragments/62561.bugfix.rst
@@ -0,0 +1 @@
+Fix backfill marked complete before DagRuns are created; add age-based cleanup 
for orphaned backfills.
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index bda95bbf4b0..67006a7d8ee 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -71,7 +71,7 @@ from airflow.models.asset import (
     TaskInletAssetReference,
     TaskOutletAssetReference,
 )
-from airflow.models.backfill import Backfill
+from airflow.models.backfill import Backfill, BackfillDagRun
 from airflow.models.callback import Callback, CallbackType, ExecutorCallback
 from airflow.models.dag import DagModel
 from airflow.models.dag_version import DagVersion
@@ -1861,8 +1861,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         unfinished_states = (DagRunState.RUNNING, DagRunState.QUEUED)
         now = timezone.utcnow()
         # todo: AIP-78 simplify this function to an update statement
+        initializing_cutoff = now - timedelta(minutes=2)
         query = select(Backfill).where(
             Backfill.completed_at.is_(None),
+            # Guard: backfill must have at least one association,
+            # otherwise it is still being set up (see #61375).
+            # Allow cleanup of orphaned backfills older than 2 minutes
+            # that failed during initialization and never got any associations.
+            or_(
+                
exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == 
Backfill.id)),
+                Backfill.created_at < initializing_cutoff,
+            ),
             ~exists(
                 select(DagRun.id).where(
                     and_(DagRun.backfill_id == Backfill.id, 
DagRun.state.in_(unfinished_states))
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 76183c7fd73..5ccbb7e4056 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -65,7 +65,7 @@ from airflow.models.asset import (
     AssetPartitionDagRun,
     PartitionedAssetKeyLog,
 )
-from airflow.models.backfill import Backfill, _create_backfill
+from airflow.models.backfill import Backfill, BackfillDagRun, 
ReprocessBehavior, _create_backfill
 from airflow.models.callback import ExecutorCallback
 from airflow.models.dag import DagModel, get_last_dagrun, 
infer_automated_data_interval
 from airflow.models.dag_version import DagVersion
@@ -8792,6 +8792,263 @@ def test_mark_backfills_completed(dag_maker, session):
     assert b.completed_at.timestamp() > 0
 
 
+def test_mark_backfills_complete_skips_initializing_backfill(dag_maker, 
session):
+    clear_db_backfills()
+    dag_id = "test_backfill_race_lifecycle"
+    with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
+        BashOperator(task_id="hi", bash_command="echo hi")
+    b = Backfill(
+        dag_id=dag_id,
+        from_date=pendulum.parse("2021-01-01"),
+        to_date=pendulum.parse("2021-01-03"),
+        max_active_runs=10,
+        dag_run_conf={},
+        reprocess_behavior=ReprocessBehavior.NONE,
+    )
+    session.add(b)
+    session.commit()
+    backfill_id = b.id
+    session.expunge_all()
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+    runner._mark_backfills_complete()
+    b = session.get(Backfill, backfill_id)
+    assert b.completed_at is None
+    session.expunge_all()
+    dr = DagRun(
+        dag_id=dag_id,
+        run_id="backfill__2021-01-01T00:00:00+00:00",
+        run_type=DagRunType.BACKFILL_JOB,
+        logical_date=pendulum.parse("2021-01-01"),
+        data_interval=(pendulum.parse("2021-01-01"), 
pendulum.parse("2021-01-02")),
+        run_after=pendulum.parse("2021-01-02"),
+        state=DagRunState.SUCCESS,
+        backfill_id=backfill_id,
+    )
+    session.add(dr)
+    session.flush()
+    session.add(
+        BackfillDagRun(
+            backfill_id=backfill_id,
+            dag_run_id=dr.id,
+            logical_date=pendulum.parse("2021-01-01"),
+            sort_ordinal=1,
+        )
+    )
+    session.commit()
+    session.expunge_all()
+    runner._mark_backfills_complete()
+    b = session.get(Backfill, backfill_id)
+    assert b.completed_at is not None
+
+
+def test_mark_backfills_complete_cleans_orphan_after_cutoff(dag_maker, 
session):
+    """Backfill with no BackfillDagRun rows older than 2 minutes should be 
auto-completed."""
+    clear_db_backfills()
+    dag_id = "test_backfill_orphan_cleanup"
+    with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
+        BashOperator(task_id="hi", bash_command="echo hi")
+    b = Backfill(
+        dag_id=dag_id,
+        from_date=pendulum.parse("2021-01-01"),
+        to_date=pendulum.parse("2021-01-03"),
+        max_active_runs=10,
+        dag_run_conf={},
+        reprocess_behavior=ReprocessBehavior.NONE,
+    )
+    session.add(b)
+    session.commit()
+    backfill_id = b.id
+    session.expunge_all()
+    # Travel 3 minutes into the future so the backfill is past the 2-minute 
cutoff
+    with time_machine.travel(pendulum.now("UTC").add(minutes=3), tick=False):
+        runner = SchedulerJobRunner(
+            job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+        )
+        runner._mark_backfills_complete()
+    b = session.get(Backfill, backfill_id)
+    assert b.completed_at is not None
+
+
+def 
test_mark_backfills_complete_keeps_old_backfill_with_running_dagruns(dag_maker, 
session):
+    """Old backfill (>2 min) with running DagRuns must NOT be marked 
complete."""
+    clear_db_backfills()
+    dag_id = "test_backfill_old_with_runs"
+    with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
+        BashOperator(task_id="hi", bash_command="echo hi")
+    b = Backfill(
+        dag_id=dag_id,
+        from_date=pendulum.parse("2021-01-01"),
+        to_date=pendulum.parse("2021-01-03"),
+        max_active_runs=10,
+        dag_run_conf={},
+        reprocess_behavior=ReprocessBehavior.NONE,
+    )
+    session.add(b)
+    session.commit()
+    backfill_id = b.id
+    dr = DagRun(
+        dag_id=dag_id,
+        run_id="backfill__2021-01-01T00:00:00+00:00",
+        run_type=DagRunType.BACKFILL_JOB,
+        logical_date=pendulum.parse("2021-01-01"),
+        data_interval=(pendulum.parse("2021-01-01"), 
pendulum.parse("2021-01-02")),
+        run_after=pendulum.parse("2021-01-02"),
+        state=DagRunState.RUNNING,
+        backfill_id=backfill_id,
+    )
+    session.add(dr)
+    session.flush()
+    session.add(
+        BackfillDagRun(
+            backfill_id=backfill_id,
+            dag_run_id=dr.id,
+            logical_date=pendulum.parse("2021-01-01"),
+            sort_ordinal=1,
+        )
+    )
+    session.commit()
+    session.expunge_all()
+    # Travel 3 minutes into the future; backfill is old but has a RUNNING 
DagRun
+    with time_machine.travel(pendulum.now("UTC").add(minutes=3), tick=False):
+        runner = SchedulerJobRunner(
+            job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+        )
+        runner._mark_backfills_complete()
+    b = session.get(Backfill, backfill_id)
+    assert b.completed_at is None
+
+
+def test_mark_backfills_complete_young_backfill_with_finished_runs(dag_maker, 
session):
+    """Young backfill (<2 min) with all SUCCESS DagRuns completes 
immediately."""
+    clear_db_backfills()
+    dag_id = "test_backfill_young_finished"
+    with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
+        BashOperator(task_id="hi", bash_command="echo hi")
+    b = Backfill(
+        dag_id=dag_id,
+        from_date=pendulum.parse("2021-01-01"),
+        to_date=pendulum.parse("2021-01-03"),
+        max_active_runs=10,
+        dag_run_conf={},
+        reprocess_behavior=ReprocessBehavior.NONE,
+    )
+    session.add(b)
+    session.commit()
+    backfill_id = b.id
+    dr = DagRun(
+        dag_id=dag_id,
+        run_id="backfill__2021-01-01T00:00:00+00:00",
+        run_type=DagRunType.BACKFILL_JOB,
+        logical_date=pendulum.parse("2021-01-01"),
+        data_interval=(pendulum.parse("2021-01-01"), 
pendulum.parse("2021-01-02")),
+        run_after=pendulum.parse("2021-01-02"),
+        state=DagRunState.SUCCESS,
+        backfill_id=backfill_id,
+    )
+    session.add(dr)
+    session.flush()
+    session.add(
+        BackfillDagRun(
+            backfill_id=backfill_id,
+            dag_run_id=dr.id,
+            logical_date=pendulum.parse("2021-01-01"),
+            sort_ordinal=1,
+        )
+    )
+    session.commit()
+    session.expunge_all()
+    # No time travel — backfill was just created, should still complete
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+    runner._mark_backfills_complete()
+    b = session.get(Backfill, backfill_id)
+    assert b.completed_at is not None
+
+
+def test_mark_backfills_complete_multiple_independent(dag_maker, session):
+    """Two backfills: one finished, one running — only the finished one 
completes."""
+    clear_db_backfills()
+    with dag_maker(serialized=True, dag_id="dag_finished", schedule="@daily"):
+        BashOperator(task_id="hi", bash_command="echo hi")
+    with dag_maker(serialized=True, dag_id="dag_running", schedule="@daily"):
+        BashOperator(task_id="hi", bash_command="echo hi")
+    b_finished = Backfill(
+        dag_id="dag_finished",
+        from_date=pendulum.parse("2021-01-01"),
+        to_date=pendulum.parse("2021-01-03"),
+        max_active_runs=10,
+        dag_run_conf={},
+        reprocess_behavior=ReprocessBehavior.NONE,
+    )
+    b_running = Backfill(
+        dag_id="dag_running",
+        from_date=pendulum.parse("2021-01-01"),
+        to_date=pendulum.parse("2021-01-03"),
+        max_active_runs=10,
+        dag_run_conf={},
+        reprocess_behavior=ReprocessBehavior.NONE,
+    )
+    session.add_all([b_finished, b_running])
+    session.commit()
+    finished_id = b_finished.id
+    running_id = b_running.id
+    # Finished backfill: SUCCESS DagRun
+    dr1 = DagRun(
+        dag_id="dag_finished",
+        run_id="backfill__2021-01-01T00:00:00+00:00",
+        run_type=DagRunType.BACKFILL_JOB,
+        logical_date=pendulum.parse("2021-01-01"),
+        data_interval=(pendulum.parse("2021-01-01"), 
pendulum.parse("2021-01-02")),
+        run_after=pendulum.parse("2021-01-02"),
+        state=DagRunState.SUCCESS,
+        backfill_id=finished_id,
+    )
+    session.add(dr1)
+    session.flush()
+    session.add(
+        BackfillDagRun(
+            backfill_id=finished_id,
+            dag_run_id=dr1.id,
+            logical_date=pendulum.parse("2021-01-01"),
+            sort_ordinal=1,
+        )
+    )
+    # Running backfill: RUNNING DagRun
+    dr2 = DagRun(
+        dag_id="dag_running",
+        run_id="backfill__2021-01-01T00:00:00+00:00",
+        run_type=DagRunType.BACKFILL_JOB,
+        logical_date=pendulum.parse("2021-01-01"),
+        data_interval=(pendulum.parse("2021-01-01"), 
pendulum.parse("2021-01-02")),
+        run_after=pendulum.parse("2021-01-02"),
+        state=DagRunState.RUNNING,
+        backfill_id=running_id,
+    )
+    session.add(dr2)
+    session.flush()
+    session.add(
+        BackfillDagRun(
+            backfill_id=running_id,
+            dag_run_id=dr2.id,
+            logical_date=pendulum.parse("2021-01-01"),
+            sort_ordinal=1,
+        )
+    )
+    session.commit()
+    session.expunge_all()
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+    runner._mark_backfills_complete()
+    b_finished = session.get(Backfill, finished_id)
+    b_running = session.get(Backfill, running_id)
+    assert b_finished.completed_at is not None
+    assert b_running.completed_at is None
+
+
 class Key1Mapper(CorePartitionMapper):
     """Partition Mapper that returns only key-1 as downstream key"""
 

Reply via email to