jason810496 commented on code in PR #66004:
URL: https://github.com/apache/airflow/pull/66004#discussion_r3296708227


##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -117,6 +122,70 @@ def dag_delete(args) -> None:
         print("Cancelled")
 
 
+@cli_utils.action_cli
+@providers_configuration_loaded
+@provide_session
+def dag_clear(args, session: Session = NEW_SESSION) -> None:
+    """Clear Dag runs selected by run_id, partition_key, or a partition_date 
window."""
+    has_range = args.partition_date_start is not None or 
args.partition_date_end is not None
+    selectors_used = sum([args.run_id is not None, args.partition_key is not 
None, has_range])
+    if selectors_used == 0:
+        raise SystemExit(
+            "One of --run-id, --partition-key, or --partition-date-start / 
--partition-date-end "
+            "must be provided."
+        )
+    if selectors_used > 1:
+        raise SystemExit(
+            "--run-id, --partition-key, and --partition-date-start / 
--partition-date-end are "
+            "mutually exclusive; provide exactly one selector."
+        )
+    if (
+        args.partition_date_start is not None
+        and args.partition_date_end is not None
+        and args.partition_date_start > args.partition_date_end
+    ):
+        raise SystemExit("--partition-date-start must be on or before 
--partition-date-end.")
+
+    dag = get_db_dag(bundle_names=None, dag_id=args.dag_id)
+
+    query = select(DagRun).where(DagRun.dag_id == args.dag_id)

Review Comment:
   We only need to select just these `run_id, partition_key, partition_date` 
three columns instead of all the columns.



##########
airflow-core/src/airflow/cli/cli_config.py:
##########
@@ -185,6 +185,26 @@ def string_lower_type(val):
     ),
     type=parsedate,
 )
+ARG_PARTITION_DATE_START = Arg(
+    ("--partition-date-start",),
+    help=(
+        "Inclusive lower bound of the partition_date window (matched against 
DagRun.partition_date). "
+        "Accepts the same datetime formats as --start-date."
+    ),
+    type=parsedate,
+)
+ARG_PARTITION_DATE_END = Arg(
+    ("--partition-date-end",),
+    help=(
+        "Inclusive upper bound of the partition_date window (matched against 
DagRun.partition_date). "
+        "Accepts the same datetime formats as --end-date."
+    ),
+    type=parsedate,
+)
+ARG_PARTITION_KEY = Arg(
+    ("--partition-key",),
+    help="Clear all Dag runs whose partition_key matches this exact value.",
+)

Review Comment:
   > Honor --only-failed / --only-running to scope which TIs get reset.
   
   From the PR description is not shown in the CLI args here. It seems we need 
to update the PR description before merge.



##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -117,6 +122,70 @@ def dag_delete(args) -> None:
         print("Cancelled")
 
 
+@cli_utils.action_cli
+@providers_configuration_loaded
+@provide_session
+def dag_clear(args, session: Session = NEW_SESSION) -> None:
+    """Clear Dag runs selected by run_id, partition_key, or a partition_date 
window."""
+    has_range = args.partition_date_start is not None or 
args.partition_date_end is not None
+    selectors_used = sum([args.run_id is not None, args.partition_key is not 
None, has_range])
+    if selectors_used == 0:
+        raise SystemExit(
+            "One of --run-id, --partition-key, or --partition-date-start / 
--partition-date-end "
+            "must be provided."
+        )
+    if selectors_used > 1:
+        raise SystemExit(
+            "--run-id, --partition-key, and --partition-date-start / 
--partition-date-end are "
+            "mutually exclusive; provide exactly one selector."
+        )
+    if (
+        args.partition_date_start is not None
+        and args.partition_date_end is not None
+        and args.partition_date_start > args.partition_date_end
+    ):
+        raise SystemExit("--partition-date-start must be on or before 
--partition-date-end.")
+
+    dag = get_db_dag(bundle_names=None, dag_id=args.dag_id)
+
+    query = select(DagRun).where(DagRun.dag_id == args.dag_id)
+    if args.run_id is not None:
+        query = query.where(DagRun.run_id == args.run_id)
+    elif args.partition_key is not None:
+        query = query.where(DagRun.partition_key == args.partition_key)
+    else:
+        query = query.where(DagRun.partition_date.is_not(None))
+        if args.partition_date_start is not None:
+            query = query.where(DagRun.partition_date >= 
args.partition_date_start)
+        if args.partition_date_end is not None:
+            query = query.where(DagRun.partition_date <= 
args.partition_date_end)
+    query = query.order_by(DagRun.partition_date, DagRun.run_id)
+
+    runs = list(session.scalars(query).all())
+    if not runs:
+        print("No matching Dag runs found.")
+        return
+
+    run_ids = [run.run_id for run in runs]
+    if not args.yes:
+        listing = "\n".join(
+            f"  {run.run_id}  partition_key={run.partition_key}  
partition_date={run.partition_date}"
+            for run in runs
+        )
+        question = (
+            f"You are about to clear {len(runs)} Dag run(s) of 
{args.dag_id!r}:\n"
+            f"{listing}\n\nAre you sure? [y/n]"
+        )
+        if not ask_yesno(question):
+            print("Cancelled, nothing was cleared.")
+            return
+
+    cleared = 0
+    for run_id in run_ids:
+        cleared += dag.clear(run_id=run_id, session=session)

Review Comment:
   > Today the only ways to bulk-reprocess such runs are N API calls or N UI 
clicks.
   
   Even though we don't need `N` API calls for bulk delete here, but it's 
worthwhile to create a new issue to track the "real" bulk delete that 
**without** having `N` transactions.
   
   Since `dag.clear` will create a new DB TX for it, which might not be ideal 
for clearing hundreds of DagRun (but it's out of scope for this PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to