This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new d4b48ac4ea6 Validate dag run conf in backfill dry-run (#66196) (#66935)
d4b48ac4ea6 is described below

commit d4b48ac4ea6e46abf2940793923f73646e2acaf3
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 10:11:12 2026 +0530

    Validate dag run conf in backfill dry-run (#66196) (#66935)
    
    * Add dry-run backfill invalid conf regression tests
    
    * Validate dag run conf during backfill dry run
    
    (cherry picked from commit c533ba1a6d80800194298a35e95abda759b326a8)
    
    Co-authored-by: Sangun-Lee-6 
<[email protected]>
---
 .../core_api/routes/public/backfills.py            |  1 +
 airflow-core/src/airflow/models/backfill.py        |  3 +-
 .../core_api/routes/public/test_backfills.py       | 33 ++++++++++++++++++++++
 airflow-core/tests/unit/models/test_backfill.py    | 25 ++++++++++++++++
 4 files changed, 61 insertions(+), 1 deletion(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
index e920c85f3c1..5d4e112df18 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
@@ -291,6 +291,7 @@ def create_backfill_dry_run(
             to_date=to_date,
             reverse=body.run_backwards,
             reprocess_behavior=body.reprocess_behavior,
+            dag_run_conf=body.dag_run_conf,
             session=session,
         )
         backfills = [
diff --git a/airflow-core/src/airflow/models/backfill.py 
b/airflow-core/src/airflow/models/backfill.py
index 2061e91c1f9..24c102f1cfc 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -288,6 +288,7 @@ def _do_dry_run(
     reverse: bool,
     reprocess_behavior: ReprocessBehavior,
     session: Session,
+    dag_run_conf: dict | None = None,
 ) -> Iterable[DagRunInfo]:
     from airflow.models import DagModel
     from airflow.models.serialized_dag import SerializedDagModel
@@ -296,7 +297,7 @@ def _do_dry_run(
     if not serdag:
         raise DagNotFound(f"Could not find Dag {dag_id}")
     dag = serdag.dag
-    _validate_backfill_params(dag, reverse, from_date, to_date, 
reprocess_behavior)
+    _validate_backfill_params(dag, reverse, from_date, to_date, 
reprocess_behavior, dag_run_conf)
 
     no_schedule = session.scalar(
         select(func.count()).where(DagModel.timetable_summary == "None", 
DagModel.dag_id == dag_id)
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
index 10bfd0f7817..93f72c881a3 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
@@ -757,6 +757,39 @@ class TestCreateBackfillDryRun(TestBackfillEndpoint):
         response_json = response.json()
         assert response_json["backfills"] == expected_dates
 
+    def test_create_backfill_dry_run_rejects_invalid_conf(self, session, 
dag_maker, test_client):
+        from airflow.sdk import Param
+
+        with dag_maker(
+            session=session,
+            dag_id="TEST_DAG_2",
+            schedule="0 0 * * *",
+            start_date=pendulum.parse("2024-01-01"),
+            params={"validated_number": Param(1, type="integer", minimum=1, 
maximum=10)},
+        ) as dag:
+            EmptyOperator(task_id="mytask")
+
+        session.commit()
+
+        from_date = pendulum.parse("2024-01-01")
+        to_date = pendulum.parse("2024-01-05")
+
+        response = test_client.post(
+            url="/backfills/dry_run",
+            json={
+                "dag_id": dag.dag_id,
+                "from_date": to_iso(from_date),
+                "to_date": to_iso(to_date),
+                "max_active_runs": 5,
+                "run_backwards": False,
+                "dag_run_conf": {"validated_number": 99},
+                "reprocess_behavior": "none",
+            },
+        )
+
+        assert response.status_code == 422
+        assert "Invalid input for param validated_number" in 
response.json().get("detail")
+
     @pytest.mark.parametrize(
         ("repro_act", "repro_exp", "run_backwards", "status_code"),
         [
diff --git a/airflow-core/tests/unit/models/test_backfill.py 
b/airflow-core/tests/unit/models/test_backfill.py
index ea1082eb52d..2749e0e6776 100644
--- a/airflow-core/tests/unit/models/test_backfill.py
+++ b/airflow-core/tests/unit/models/test_backfill.py
@@ -37,6 +37,7 @@ from airflow.models.backfill import (
     InvalidReprocessBehavior,
     ReprocessBehavior,
     _create_backfill,
+    _do_dry_run,
     _get_latest_dag_run_row_query,
 )
 from airflow.providers.standard.operators.python import PythonOperator
@@ -547,6 +548,30 @@ def test_backfill_rejects_invalid_conf(dag_maker, session):
     assert session.scalar(select(DagRun).where(DagRun.dag_id == dag.dag_id)) 
is None
 
 
+def test_do_dry_run_rejects_invalid_conf(dag_maker, session):
+    """Dry run with invalid conf should fail validation."""
+    from airflow.sdk import Param
+
+    with dag_maker(
+        schedule="@daily",
+        params={"validated_number": Param(1, type="integer", minimum=1, 
maximum=10)},
+    ) as dag:
+        PythonOperator(task_id="hi", python_callable=print)
+
+    with pytest.raises(InvalidBackfillConf, match="Invalid input for param 
validated_number"):
+        list(
+            _do_dry_run(
+                dag_id=dag.dag_id,
+                from_date=pendulum.parse("2021-01-01"),
+                to_date=pendulum.parse("2021-01-05"),
+                reverse=False,
+                reprocess_behavior=ReprocessBehavior.NONE,
+                dag_run_conf={"validated_number": 99},
+                session=session,
+            )
+        )
+
+
 def test_params_stored_correctly(dag_maker, session):
     with dag_maker(schedule="@daily") as dag:
         PythonOperator(task_id="hi", python_callable=print)

Reply via email to