kaxil commented on code in PR #67962:
URL: https://github.com/apache/airflow/pull/67962#discussion_r3464063850
##########
providers/http/src/airflow/providers/http/operators/http.py:
##########
@@ -209,6 +209,12 @@ def paginate_sync(self, response: Response) -> Response |
list[Response]:
return all_responses
def execute_async(self, context: Context) -> None:
+ if self.method.upper() not in ("GET", "HEAD", "OPTIONS"):
+ self.log.warning(
+ "HttpOperator with deferrable=True and method=%s may send
duplicate requests if the Triggerer restarts.",
+ self.method,
+ )
Review Comment:
You're right that `HttpTrigger` can't poll for a request that's already in
flight, and I'm not suggesting we touch the trigger at all.
The shape I mean is similar to what we did for `FileSensor.execute()`:
https://github.com/apache/airflow/blob/ce72c02c5a0f4ba2e83465fff7024a344b06ad53/providers/standard/src/airflow/providers/standard/sensors/filesystem.py#L120-L132
It runs the actual operation (`poke`) on the worker first and only calls
`self.defer()` if there's still something to wait for. `FileTrigger` is
untouched; the restructuring is entirely in the operator's `execute()`.
The reason `FileSensor` can defer at all is that `poke` is idempotent, so if
the triggerer restarts and re-runs it, nothing breaks. A
`POST`/`PUT`/`PATCH`/`DELETE` doesn't have that property, and that's exactly
the bug: the request lives in `HttpTrigger.run()`, which restarts from scratch
on a triggerer restart and fires the request a second time.
So the fix is to run non-idempotent requests on the worker (once per task
try, under normal retry semantics) and only defer the methods that are safe to
re-run imo:
```python
def execute(self, context: Context) -> Any:
if self.deferrable and self.method.upper() in ("GET", "HEAD", "OPTIONS"):
self.execute_async(context=context)
else:
return self.execute_sync(context=context)
```
No trigger changes, just a branch in `execute()`. This is the "long-term
fix" the issue describes if I am reading it correctly.
On whether it's even possible to make a write safe *in the trigger*: it is,
but only if the server honors an idempotency key (`Idempotency-Key` style), and
the key has to be minted on the worker and passed into the trigger's serialized
kwargs so the replay sends the same one. Without server-side dedup it's the
exactly-once-delivery problem and can't be solved from the client. Worth adding
as an opt-in later, but not needed here.
A `log.warning` leaves the duplicate request in place and just narrates the
risk and I am not even sure if it is useful 🤷 . Running the write on the worker
removes it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]