Lee-W commented on code in PR #68702:
URL: https://github.com/apache/airflow/pull/68702#discussion_r3453571892
##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -150,6 +155,91 @@ def perform_clear_dag_run(
return dag_run_cleared
+_TI_CHUNK_SIZE = 500
+
+
+def clear_partition_fields(
+ *,
+ dag: SerializedDAG,
+ body: ClearPartitionsBody,
+ dag_id: str,
+ session: Session,
+) -> tuple[int, int]:
+ """
+ Reset partition_key and partition_date to None on matching runs.
+
+ Returns (dag_runs_cleared, task_instances_cleared).
+ Mirrors ``airflow partitions clear`` column-reset behavior.
+ """
+ stmt = select(DagRun).where(DagRun.dag_id == dag_id)
+ if body.run_id is not None:
+ stmt = stmt.where(DagRun.run_id == body.run_id)
+ elif body.partition_key is not None:
+ stmt = stmt.where(DagRun.partition_key == body.partition_key)
+ else:
+ stmt = stmt.where(or_(DagRun.partition_key.is_not(None),
DagRun.partition_date.is_not(None)))
+ if body.partition_date_start is not None:
+ lower =
dag.timetable.resolve_day_bound(body.partition_date_start.date())
+ stmt = stmt.where(DagRun.partition_date >= lower)
+ if body.partition_date_end is not None:
+ upper =
dag.timetable.resolve_day_bound(body.partition_date_end.date() +
timedelta(days=1))
+ stmt = stmt.where(DagRun.partition_date < upper)
+ stmt = stmt.order_by(DagRun.partition_date, DagRun.run_id)
+
+ dag_runs_cleared = 0
+ # Buffers for batched TI fetching — mirrors _flush_buffer in
partition_command.py
+ ti_buffer_run_ids: list[str] = []
+ ti_carry: list[TaskInstance] = []
+ tis_cleared_total = 0
+
+ def _flush_ti_buffer(*, drain: bool = False) -> int:
+ flushed = 0
+ if ti_buffer_run_ids:
+ chunk_tis = list(
+
session.scalars(select(TaskInstance).where(TaskInstance.run_id.in_(ti_buffer_run_ids)))
Review Comment:
It was also missed in CLI. Now added `TaskInstance.dag_id == dag_id` to the
newly added shared function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]