pierrejeambrun commented on code in PR #45062:
URL: https://github.com/apache/airflow/pull/45062#discussion_r1897349296
##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -206,3 +213,55 @@ def create_backfill(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag
{backfill_request.dag_id}",
)
+
+
+@backfills_router.post(
+ path="/dry_run",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
+)
+def create_backfill_dry_run(
+ backfill_request: BackfillPostBody,
+ session: SessionDep,
+) -> BackfillDryRunResponse:
+ from_date = timezone.coerce_datetime(backfill_request.from_date)
+ to_date = timezone.coerce_datetime(backfill_request.to_date)
+ serdag =
session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id))
+ if not serdag:
+ raise HTTPException(status_code=404, detail=f"Could not find dag
{backfill_request.dag_id}")
+
+ info_list = _get_info_list(
+ dag=serdag.dag,
+ from_date=from_date,
+ to_date=to_date,
+ reverse=backfill_request.run_backwards,
+ )
+ backfill_response_item = []
+ for info in info_list:
+ dr = session.scalar(
+ select(DagRun)
+ .where(DagRun.logical_date == info.logical_date)
+ .order_by(nulls_first(desc(DagRun.start_date), session))
+ .limit(1)
+ )
+
+ if dr:
+ non_create_reason = None
+ if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
+ non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
+ elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE:
+ non_create_reason =
BackfillDagRunExceptionReason.ALREADY_EXISTS
+ elif backfill_request.reprocess_behavior is
ReprocessBehavior.FAILED:
+ if dr.state != DagRunState.FAILED:
+ non_create_reason =
BackfillDagRunExceptionReason.ALREADY_EXISTS
+ if not non_create_reason:
+
backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date))
+
+ else:
+
backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date))
Review Comment:
I think this logic is duplicated from `_create_backfill_dag_run`.
I don't think that is the best option because this will cause problem when
someone update this function.
Also this complex logic shouldn't be inside the router.
I would recommend updating `_create_backfill` and `_create_backfill_dag_run`
to support an extra `dry_run: bool = False` keyword that will prevent adding
and committing the session. Also `@overload` the signature to signal that the
return type when `dry_run: true` is different and it's a list of maybe
`BackfillDagRun` or anything else, to leave the signature of when `dry_run:
false` unchanged.
This way no logic will be duplicated and there is no chance that the
webserver and the underlying logic of the backfill service get de-synced. We
just need to call `_create_backfill` with `dry_run=True` and we retrieve the
list of appropriate objects. (This is very similar to how we `DAG.clear` works)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]