pierrejeambrun commented on code in PR #45062:
URL: https://github.com/apache/airflow/pull/45062#discussion_r1897349296


##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -206,3 +213,55 @@ def create_backfill(
             status_code=status.HTTP_409_CONFLICT,
             detail=f"There is already a running backfill for dag 
{backfill_request.dag_id}",
         )
+
+
+@backfills_router.post(
+    path="/dry_run",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_409_CONFLICT,
+        ]
+    ),
+)
+def create_backfill_dry_run(
+    backfill_request: BackfillPostBody,
+    session: SessionDep,
+) -> BackfillDryRunResponse:
+    from_date = timezone.coerce_datetime(backfill_request.from_date)
+    to_date = timezone.coerce_datetime(backfill_request.to_date)
+    serdag = 
session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id))
+    if not serdag:
+        raise HTTPException(status_code=404, detail=f"Could not find dag 
{backfill_request.dag_id}")
+
+    info_list = _get_info_list(
+        dag=serdag.dag,
+        from_date=from_date,
+        to_date=to_date,
+        reverse=backfill_request.run_backwards,
+    )
+    backfill_response_item = []
+    for info in info_list:
+        dr = session.scalar(
+            select(DagRun)
+            .where(DagRun.logical_date == info.logical_date)
+            .order_by(nulls_first(desc(DagRun.start_date), session))
+            .limit(1)
+        )
+
+        if dr:
+            non_create_reason = None
+            if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
+                non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
+            elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE:
+                non_create_reason = 
BackfillDagRunExceptionReason.ALREADY_EXISTS
+            elif backfill_request.reprocess_behavior is 
ReprocessBehavior.FAILED:
+                if dr.state != DagRunState.FAILED:
+                    non_create_reason = 
BackfillDagRunExceptionReason.ALREADY_EXISTS
+            if not non_create_reason:
+                
backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date))
+
+        else:
+            
backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date))

Review Comment:
   I think this logic is duplicated from `_create_backfill_dag_run`.
   
   I don't think that is the best option because this will cause problem when 
someone update this function.
   
   Also this complex logic shouldn't be inside the router.
   
   I would recommend updating `_create_backfill` and `_create_backfill_dag_run` 
to support an extra `dry_run:  bool = False` keyword that will prevent adding 
and committing the session. Also `@overload` the signature to signal that the 
return type when `dry_run: true` is different and it's a list of maybe 
`BackfillDagRun` or anything else, to leave the signature of when `dry_run: 
false` unchanged.
   
   This way no logic will be duplicated and there is no chance that the 
webserver and the underlying logic of the backfill service get de-synced. We 
just need to call `_create_backfill` with `dry_run=True` and we retrieve the 
list of appropriate objects. (This is very similar to how we `DAG.clear` works)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to