This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c6d6e007913 feat(cli): add `airflow dags clear` for partition-range 
reprocessing (#66004)
c6d6e007913 is described below

commit c6d6e007913ad8708344fc7275bfcc4ff4632ed0
Author: Wei Lee <[email protected]>
AuthorDate: Mon May 25 23:26:42 2026 +0800

    feat(cli): add `airflow dags clear` for partition-range reprocessing 
(#66004)
---
 airflow-core/src/airflow/cli/cli_config.py         |  49 ++++
 .../src/airflow/cli/commands/dag_command.py        |  78 +++++-
 .../tests/unit/cli/commands/test_dag_command.py    | 308 ++++++++++++++++++++-
 3 files changed, 432 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/cli/cli_config.py 
b/airflow-core/src/airflow/cli/cli_config.py
index df0c2c15fbe..b41ac481e0e 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -185,6 +185,30 @@ ARG_END_DATE = Arg(
     ),
     type=parsedate,
 )
+ARG_PARTITION_DATE_START = Arg(
+    ("--partition-date-start",),
+    help=(
+        "Inclusive lower bound of the partition_date window (matched against 
DagRun.partition_date). "
+        "Accepts the same datetime formats as --start-date."
+    ),
+    type=parsedate,
+)
+ARG_PARTITION_DATE_END = Arg(
+    ("--partition-date-end",),
+    help=(
+        "Inclusive upper bound of the partition_date window (matched against 
DagRun.partition_date). "
+        "Accepts the same datetime formats as --end-date."
+    ),
+    type=parsedate,
+)
+ARG_PARTITION_KEY = Arg(
+    ("--partition-key",),
+    help="Clear all Dag runs whose partition_key matches this exact value.",
+)
+ARG_CLEAR_RUN_ID = Arg(
+    ("--run-id",),
+    help="Clear the Dag run with this run_id.",
+)
 ARG_OUTPUT_PATH = Arg(
     (
         "-o",
@@ -1101,6 +1125,31 @@ DAGS_COMMANDS = (
         func=lazy_load_command("airflow.cli.commands.dag_command.dag_details"),
         args=(ARG_DAG_ID, ARG_OUTPUT, ARG_VERBOSE),
     ),
+    ActionCommand(
+        name="clear",
+        help="Clear Dag runs selected by run_id, partition_key, or a 
partition_date window",
+        description=(
+            "Clear Dag runs of the given dag_id and re-queue them for 
reprocessing. Exactly one "
+            "of the following selectors must be provided: --run-id (single 
run); --partition-key "
+            "(every run with that exact partition_key); or a partition_date 
window via "
+            "--partition-date-start and/or --partition-date-end (inclusive on 
both ends). "
+            "Intended for partitioned Dags, whose runs are keyed by 
partition_date / "
+            "partition_key instead of logical_date. For traditional, 
non-partitioned Dags, use "
+            "`airflow tasks clear --start-date / --end-date`."
+        ),
+        func=lazy_load_command("airflow.cli.commands.dag_command.dag_clear"),
+        args=(
+            ARG_DAG_ID,
+            ARG_CLEAR_RUN_ID,
+            ARG_PARTITION_KEY,
+            ARG_PARTITION_DATE_START,
+            ARG_PARTITION_DATE_END,
+            ARG_ONLY_FAILED,
+            ARG_ONLY_RUNNING,
+            ARG_YES,
+            ARG_VERBOSE,
+        ),
+    ),
     ActionCommand(
         name="list",
         help="List all the DAGs",
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py 
b/airflow-core/src/airflow/cli/commands/dag_command.py
index 13a4ad90596..3a28e4f1b20 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -47,7 +47,12 @@ from airflow.models.errors import ParseImportError
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.timetables.base import TimeRestriction
 from airflow.utils import cli as cli_utils
-from airflow.utils.cli import get_bagged_dag, suppress_logs_and_warning, 
validate_dag_bundle_arg
+from airflow.utils.cli import (
+    get_bagged_dag,
+    get_db_dag,
+    suppress_logs_and_warning,
+    validate_dag_bundle_arg,
+)
 from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
 from airflow.utils.helpers import ask_yesno
 from airflow.utils.platform import getuser
@@ -117,6 +122,77 @@ def dag_delete(args) -> None:
         print("Cancelled")
 
 
+@cli_utils.action_cli
+@providers_configuration_loaded
+@provide_session
+def dag_clear(args, session: Session = NEW_SESSION) -> None:
+    """Clear Dag runs selected by run_id, partition_key, or a partition_date 
window."""
+    has_range = args.partition_date_start is not None or 
args.partition_date_end is not None
+    selectors_used = sum([args.run_id is not None, args.partition_key is not 
None, has_range])
+    if selectors_used == 0:
+        raise SystemExit(
+            "One of --run-id, --partition-key, or --partition-date-start / 
--partition-date-end "
+            "must be provided."
+        )
+    if selectors_used > 1:
+        raise SystemExit(
+            "--run-id, --partition-key, and --partition-date-start / 
--partition-date-end are "
+            "mutually exclusive; provide exactly one selector."
+        )
+    if (
+        args.partition_date_start is not None
+        and args.partition_date_end is not None
+        and args.partition_date_start > args.partition_date_end
+    ):
+        raise SystemExit("--partition-date-start must be on or before 
--partition-date-end.")
+
+    dag = get_db_dag(bundle_names=None, dag_id=args.dag_id)
+
+    query = select(DagRun.run_id, DagRun.partition_key, 
DagRun.partition_date).where(
+        DagRun.dag_id == args.dag_id
+    )
+    if args.run_id is not None:
+        query = query.where(DagRun.run_id == args.run_id)
+    elif args.partition_key is not None:
+        query = query.where(DagRun.partition_key == args.partition_key)
+    else:
+        query = query.where(DagRun.partition_date.is_not(None))
+        if args.partition_date_start is not None:
+            query = query.where(DagRun.partition_date >= 
args.partition_date_start)
+        if args.partition_date_end is not None:
+            query = query.where(DagRun.partition_date <= 
args.partition_date_end)
+    query = query.order_by(DagRun.partition_date, DagRun.run_id)
+
+    runs = list(session.execute(query).all())
+    if not runs:
+        print("No matching Dag runs found.")
+        return
+
+    run_ids = [run.run_id for run in runs]
+    if not args.yes:
+        listing = "\n".join(
+            f"  {run.run_id}  partition_key={run.partition_key}  
partition_date={run.partition_date}"
+            for run in runs
+        )
+        question = (
+            f"You are about to clear {len(runs)} Dag run(s) of 
{args.dag_id!r}:\n"
+            f"{listing}\n\nAre you sure? [y/n]"
+        )
+        if not ask_yesno(question):
+            print("Cancelled, nothing was cleared.")
+            return
+
+    cleared = 0
+    for run_id in run_ids:
+        cleared += dag.clear(
+            run_id=run_id,
+            only_failed=args.only_failed,
+            only_running=args.only_running,
+            session=session,
+        )
+    print(f"Cleared {cleared} task instance(s) across {len(run_ids)} Dag 
run(s).")
+
+
 @cli_utils.action_cli
 @providers_configuration_loaded
 def dag_pause(args) -> None:
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py 
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index 183e80ee216..37178a85ef5 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -41,14 +41,16 @@ from airflow.exceptions import AirflowException
 from airflow.models import DagModel, DagRun
 from airflow.models.dagbag import DBDagBag
 from airflow.models.serialized_dag import SerializedDagModel
+from airflow.models.taskinstance import TaskInstance
+from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.triggers.temporal import DateTimeTrigger, 
TimeDeltaTrigger
-from airflow.sdk import DAG, BaseOperator, task
+from airflow.sdk import DAG, BaseOperator, CronPartitionTimetable, task
 from airflow.sdk.definitions.dag import _run_inline_trigger
 from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame
 from airflow.serialization.serialized_objects import DagSerialization, 
LazyDeserializedDAG
 from airflow.triggers.base import TriggerEvent
 from airflow.utils.session import create_session
-from airflow.utils.state import DagRunState
+from airflow.utils.state import DagRunState, TaskInstanceState
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.config import conf_vars
@@ -1158,6 +1160,308 @@ class TestCliDagsReserialize:
         assert dag_processor_parsing_result.serialized_dags[0].hash == 
serialized_dag_hash[0]
 
 
+class TestCliDagsClear:
+    """Tests for the `airflow dags clear` partition-range subcommand."""
+
+    DAG_ID = "test_dags_clear_partitioned"
+
+    @pytest.fixture
+    def parser(self) -> argparse.ArgumentParser:
+        return cli_parser.get_parser()
+
+    @pytest.fixture(autouse=True)
+    def _clear_db(self):
+        yield
+        clear_db_runs()
+        clear_db_dags()
+
+    @pytest.fixture
+    def seeded_partitioned_runs(self, dag_maker):
+        with dag_maker(
+            self.DAG_ID,
+            schedule=CronPartitionTimetable("0 0 * * *", 
timezone=pendulum.UTC),
+            start_date=datetime(2026, 3, 1, tzinfo=pendulum.UTC),
+            catchup=True,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t1")
+        # Three partitioned runs, plus one unpartitioned run that must never 
be touched.
+        dag_maker.create_dagrun(
+            run_id="part_2026_03_08",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2026, 3, 8, tzinfo=pendulum.UTC),
+            partition_key="2026-03-08T00:00:00",
+        )
+        dag_maker.create_dagrun(
+            run_id="part_2026_03_10",
+            state=DagRunState.FAILED,
+            logical_date=None,
+            partition_date=datetime(2026, 3, 10, tzinfo=pendulum.UTC),
+            partition_key="2026-03-10T00:00:00",
+        )
+        dag_maker.create_dagrun(
+            run_id="part_2026_03_14",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2026, 3, 14, tzinfo=pendulum.UTC),
+            partition_key="2026-03-14T00:00:00",
+        )
+        dag_maker.create_dagrun(
+            run_id="non_partitioned",
+            state=DagRunState.SUCCESS,
+            logical_date=datetime(2026, 3, 9, tzinfo=pendulum.UTC),
+            partition_date=None,
+        )
+        dag_maker.sync_dagbag_to_db()
+
+    def _get_run_states(self):
+        with create_session() as session:
+            return {
+                row.run_id: row.state
+                for row in session.scalars(select(DagRun).where(DagRun.dag_id 
== self.DAG_ID)).all()
+            }
+
+    def test_requires_a_selector(self, parser):
+        args = parser.parse_args(["dags", "clear", self.DAG_ID, "--yes"])
+        with pytest.raises(SystemExit, match="One of --run-id, 
--partition-key"):
+            dag_command.dag_clear(args)
+
+    @pytest.mark.parametrize(
+        "extra_args",
+        [
+            pytest.param(
+                ["--run-id", "part_2026_03_10", "--partition-key", 
"2026-03-10T00:00:00"],
+                id="run-id+partition-key",
+            ),
+            pytest.param(
+                ["--run-id", "part_2026_03_10", "--partition-date-start", 
"2026-03-08T00:00:00"],
+                id="run-id+date-range",
+            ),
+            pytest.param(
+                ["--partition-key", "2026-03-10T00:00:00", 
"--partition-date-end", "2026-03-14T00:00:00"],
+                id="partition-key+date-range",
+            ),
+        ],
+    )
+    def test_rejects_multiple_selectors(self, parser, extra_args):
+        args = parser.parse_args(["dags", "clear", self.DAG_ID, "--yes", 
*extra_args])
+        with pytest.raises(SystemExit, match="mutually exclusive"):
+            dag_command.dag_clear(args)
+
+    @pytest.mark.usefixtures("seeded_partitioned_runs")
+    def test_rejects_inverted_window(self, parser):
+        args = parser.parse_args(
+            [
+                "dags",
+                "clear",
+                self.DAG_ID,
+                "--partition-date-start",
+                "2026-03-14T00:00:00",
+                "--partition-date-end",
+                "2026-03-08T00:00:00",
+                "--yes",
+            ]
+        )
+        with pytest.raises(SystemExit, match="--partition-date-start must be 
on or before"):
+            dag_command.dag_clear(args)
+
+    @pytest.mark.usefixtures("seeded_partitioned_runs")
+    def test_clears_runs_in_window_inclusive(self, parser):
+        # Literal flag values from issue #65921: short ISO `YYYY-MM-DDTHH` 
form.
+        args = parser.parse_args(
+            [
+                "dags",
+                "clear",
+                self.DAG_ID,
+                "--partition-date-start",
+                "2026-03-08T00",
+                "--partition-date-end",
+                "2026-03-14T23",
+                "--yes",
+            ]
+        )
+        dag_command.dag_clear(args)
+
+        states = self._get_run_states()
+        # Inclusive both ends: 03-08 and 03-14 boundary runs are cleared 
(state -> QUEUED).
+        assert states["part_2026_03_08"] == DagRunState.QUEUED
+        assert states["part_2026_03_10"] == DagRunState.QUEUED
+        assert states["part_2026_03_14"] == DagRunState.QUEUED
+        # Run with NULL partition_date is never matched.
+        assert states["non_partitioned"] == DagRunState.SUCCESS
+
+    @pytest.mark.usefixtures("seeded_partitioned_runs")
+    def test_open_lower_bound(self, parser):
+        args = parser.parse_args(
+            [
+                "dags",
+                "clear",
+                self.DAG_ID,
+                "--partition-date-end",
+                "2026-03-09T00:00:00",
+                "--yes",
+            ]
+        )
+        dag_command.dag_clear(args)
+
+        states = self._get_run_states()
+        assert states["part_2026_03_08"] == DagRunState.QUEUED
+        assert states["part_2026_03_10"] == DagRunState.FAILED
+        assert states["part_2026_03_14"] == DagRunState.SUCCESS
+
+    @pytest.mark.usefixtures("seeded_partitioned_runs")
+    def test_open_upper_bound(self, parser):
+        args = parser.parse_args(
+            [
+                "dags",
+                "clear",
+                self.DAG_ID,
+                "--partition-date-start",
+                "2026-03-13T00:00:00",
+                "--yes",
+            ]
+        )
+        dag_command.dag_clear(args)
+
+        states = self._get_run_states()
+        assert states["part_2026_03_08"] == DagRunState.SUCCESS
+        assert states["part_2026_03_10"] == DagRunState.FAILED
+        assert states["part_2026_03_14"] == DagRunState.QUEUED
+
+    @pytest.mark.usefixtures("seeded_partitioned_runs")
+    def test_no_matching_runs_is_a_no_op(self, parser, capsys):
+        args = parser.parse_args(
+            [
+                "dags",
+                "clear",
+                self.DAG_ID,
+                "--partition-date-start",
+                "2027-01-01T00:00:00",
+                "--partition-date-end",
+                "2027-12-31T00:00:00",
+                "--yes",
+            ]
+        )
+        dag_command.dag_clear(args)
+        out = capsys.readouterr().out
+        assert "No matching Dag runs" in out
+        assert self._get_run_states()["part_2026_03_10"] == DagRunState.FAILED
+
+    @pytest.mark.usefixtures("seeded_partitioned_runs")
+    @mock.patch("airflow.cli.commands.dag_command.ask_yesno", 
return_value=False)
+    def test_prompt_decline_does_not_clear(self, mock_ask, parser):
+        args = parser.parse_args(
+            [
+                "dags",
+                "clear",
+                self.DAG_ID,
+                "--partition-date-start",
+                "2026-03-08T00:00:00",
+                "--partition-date-end",
+                "2026-03-14T00:00:00",
+            ]
+        )
+        dag_command.dag_clear(args)
+        mock_ask.assert_called_once()
+        states = self._get_run_states()
+        assert states["part_2026_03_08"] == DagRunState.SUCCESS
+        assert states["part_2026_03_10"] == DagRunState.FAILED
+        assert states["part_2026_03_14"] == DagRunState.SUCCESS
+
+    @pytest.mark.usefixtures("seeded_partitioned_runs")
+    def test_clears_by_run_id(self, parser):
+        args = parser.parse_args(["dags", "clear", self.DAG_ID, "--run-id", 
"part_2026_03_10", "--yes"])
+        dag_command.dag_clear(args)
+
+        states = self._get_run_states()
+        assert states["part_2026_03_08"] == DagRunState.SUCCESS
+        assert states["part_2026_03_10"] == DagRunState.QUEUED
+        assert states["part_2026_03_14"] == DagRunState.SUCCESS
+        assert states["non_partitioned"] == DagRunState.SUCCESS
+
+    @pytest.mark.usefixtures("seeded_partitioned_runs")
+    def test_clears_by_partition_key(self, parser):
+        args = parser.parse_args(
+            [
+                "dags",
+                "clear",
+                self.DAG_ID,
+                "--partition-key",
+                "2026-03-10T00:00:00",
+                "--yes",
+            ]
+        )
+        dag_command.dag_clear(args)
+
+        states = self._get_run_states()
+        assert states["part_2026_03_08"] == DagRunState.SUCCESS
+        assert states["part_2026_03_10"] == DagRunState.QUEUED
+        assert states["part_2026_03_14"] == DagRunState.SUCCESS
+        assert states["non_partitioned"] == DagRunState.SUCCESS
+
+    @pytest.mark.usefixtures("seeded_partitioned_runs")
+    def test_run_id_not_found_is_a_no_op(self, parser, capsys):
+        args = parser.parse_args(["dags", "clear", self.DAG_ID, "--run-id", 
"does_not_exist", "--yes"])
+        dag_command.dag_clear(args)
+        assert "No matching Dag runs" in capsys.readouterr().out
+        assert self._get_run_states()["part_2026_03_10"] == DagRunState.FAILED
+
+    @pytest.mark.usefixtures("seeded_partitioned_runs")
+    def test_only_failed_skips_non_failed_task_instances(self, parser):
+        # Explicitly set TI states so we can assert selectively.
+        # part_2026_03_10 has a FAILED DagRun; mark its single TI as FAILED.
+        # part_2026_03_08 has a SUCCESS DagRun; mark its single TI as SUCCESS.
+        with create_session() as session:
+            for run_id, ti_state in [
+                ("part_2026_03_08", TaskInstanceState.SUCCESS),
+                ("part_2026_03_10", TaskInstanceState.FAILED),
+            ]:
+                session.execute(
+                    TaskInstance.__table__.update()
+                    .where(TaskInstance.dag_id == self.DAG_ID)
+                    .where(TaskInstance.run_id == run_id)
+                    .values(state=ti_state)
+                )
+
+        args = parser.parse_args(
+            [
+                "dags",
+                "clear",
+                self.DAG_ID,
+                "--partition-date-start",
+                "2026-03-08T00:00:00",
+                "--partition-date-end",
+                "2026-03-14T00:00:00",
+                "--only-failed",
+                "--yes",
+            ]
+        )
+        dag_command.dag_clear(args)
+
+        states = self._get_run_states()
+        # part_2026_03_10 had a FAILED TI — its run should be re-queued.
+        assert states["part_2026_03_10"] == DagRunState.QUEUED
+        # part_2026_03_08 had no FAILED TI — its run state must be unchanged.
+        assert states["part_2026_03_08"] == DagRunState.SUCCESS
+        # Non-partitioned run is always untouched.
+        assert states["non_partitioned"] == DagRunState.SUCCESS
+
+    def test_missing_dag_raises(self, parser):
+        args = parser.parse_args(
+            [
+                "dags",
+                "clear",
+                "does_not_exist",
+                "--partition-date-start",
+                "2026-03-08T00:00:00",
+                "--yes",
+            ]
+        )
+        with pytest.raises(AirflowException, match="could not be found in the 
database"):
+            dag_command.dag_clear(args)
+
+
 class TestDagDetailsIsBackfillable:
     """Tests for the is_backfillable computation in _get_dagbag_dag_details."""
 

Reply via email to