pierrejeambrun commented on code in PR #45062:
URL: https://github.com/apache/airflow/pull/45062#discussion_r1897342655
##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -206,3 +213,55 @@ def create_backfill(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag
{backfill_request.dag_id}",
)
+
+
+@backfills_router.post(
+ path="/dry_run",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
+)
+def create_backfill_dry_run(
+ backfill_request: BackfillPostBody,
Review Comment:
```suggestion
body: BackfillPostBody,
```
##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -206,3 +213,55 @@ def create_backfill(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag
{backfill_request.dag_id}",
)
+
+
+@backfills_router.post(
+ path="/dry_run",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
+)
+def create_backfill_dry_run(
+ backfill_request: BackfillPostBody,
+ session: SessionDep,
+) -> BackfillDryRunResponse:
+ from_date = timezone.coerce_datetime(backfill_request.from_date)
+ to_date = timezone.coerce_datetime(backfill_request.to_date)
+ serdag =
session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id))
+ if not serdag:
+ raise HTTPException(status_code=404, detail=f"Could not find dag
{backfill_request.dag_id}")
+
+ info_list = _get_info_list(
+ dag=serdag.dag,
+ from_date=from_date,
+ to_date=to_date,
+ reverse=backfill_request.run_backwards,
+ )
+ backfill_response_item = []
+ for info in info_list:
+ dr = session.scalar(
+ select(DagRun)
+ .where(DagRun.logical_date == info.logical_date)
+ .order_by(nulls_first(desc(DagRun.start_date), session))
+ .limit(1)
+ )
+
+ if dr:
+ non_create_reason = None
+ if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
+ non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
+ elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE:
+ non_create_reason =
BackfillDagRunExceptionReason.ALREADY_EXISTS
+ elif backfill_request.reprocess_behavior is
ReprocessBehavior.FAILED:
+ if dr.state != DagRunState.FAILED:
+ non_create_reason =
BackfillDagRunExceptionReason.ALREADY_EXISTS
+ if not non_create_reason:
+
backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date))
+
+ else:
+
backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date))
Review Comment:
I think this logic is duplicated from `_create_backfill_dag_run`.
I don't think that is the best option because this will cause problem when
someone update this function.
Also this complex logic shouldn't be inside the router.
I would recommend updating `_create_backfill` and `_create_backfill_dag_run`
to support an extra `dry_run: bool` keyword that will prevent adding and
committing the session. Also `@overload` the signature to signal that the
return type when `dry_run: true` is different and it's a list of maybe
`BackfillDagRun` or anything else, to leave the signature of when `dry_run:
false` unchanged.
This way no logic will be duplicated and there is no chance that the
webserver and the underlying logic of the backfill service get de-synced. We
just need to call `_create_backfill` with `dry_run=True` and we retrieve the
list of appropriate objects. (This is very similar to how we `DAG.clear` works)
##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -206,3 +213,55 @@ def create_backfill(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag
{backfill_request.dag_id}",
)
+
+
+@backfills_router.post(
+ path="/dry_run",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
+)
+def create_backfill_dry_run(
+ backfill_request: BackfillPostBody,
+ session: SessionDep,
+) -> BackfillDryRunResponse:
+ from_date = timezone.coerce_datetime(backfill_request.from_date)
+ to_date = timezone.coerce_datetime(backfill_request.to_date)
+ serdag =
session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id))
+ if not serdag:
+ raise HTTPException(status_code=404, detail=f"Could not find dag
{backfill_request.dag_id}")
+
+ info_list = _get_info_list(
+ dag=serdag.dag,
+ from_date=from_date,
+ to_date=to_date,
+ reverse=backfill_request.run_backwards,
Review Comment:
Shouldn't this come from the `depend on past` from the dag ?
##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -206,3 +213,55 @@ def create_backfill(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag
{backfill_request.dag_id}",
)
+
+
+@backfills_router.post(
+ path="/dry_run",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
+)
+def create_backfill_dry_run(
+ backfill_request: BackfillPostBody,
+ session: SessionDep,
+) -> BackfillDryRunResponse:
+ from_date = timezone.coerce_datetime(backfill_request.from_date)
+ to_date = timezone.coerce_datetime(backfill_request.to_date)
+ serdag =
session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id))
+ if not serdag:
+ raise HTTPException(status_code=404, detail=f"Could not find dag
{backfill_request.dag_id}")
+
+ info_list = _get_info_list(
+ dag=serdag.dag,
+ from_date=from_date,
+ to_date=to_date,
+ reverse=backfill_request.run_backwards,
+ )
+ backfill_response_item = []
+ for info in info_list:
+ dr = session.scalar(
+ select(DagRun)
+ .where(DagRun.logical_date == info.logical_date)
+ .order_by(nulls_first(desc(DagRun.start_date), session))
+ .limit(1)
+ )
Review Comment:
I think we should change this pattern to make only one request do the db.
Maybe something like:
```python
dag_runs = ......where(DagRun.logical_date in [info.logical_date for info in
....]
```
Queries to the db are expensive and we should avoid multiplying them.
##########
airflow/api_fastapi/core_api/datamodels/backfills.py:
##########
@@ -56,3 +57,15 @@ class BackfillCollectionResponse(BaseModel):
backfills: list[BackfillResponse]
total_entries: int
+
+
+class BackfillRunInfo(BaseModel):
+ """Data model for run information during a backfill operation."""
+
+ logical_date: datetime
+
+
+class BackfillDryRunResponse(BaseModel):
+ """Serializer for responses in dry-run mode for backfill operations."""
+
+ run_info_list: list[BackfillRunInfo]
Review Comment:
This should have the same format as any other `list` endpoint, returning a
`CollectionResponse`.
You can take a look at:
- DagRunCollectionResponse
- TaskInstanceCollectionResponse
- TaskInsanceHistoryCollectionResponse
- EventLogCollectionResponse
- ....
Basically a `total_entries` key and another key for your object list. (most
likely not `run_info_list`, just `dry_run_backfills` or just `backfills`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]