kaxil commented on code in PR #67962:
URL: https://github.com/apache/airflow/pull/67962#discussion_r3464063850


##########
providers/http/src/airflow/providers/http/operators/http.py:
##########
@@ -209,6 +209,12 @@ def paginate_sync(self, response: Response) -> Response | 
list[Response]:
         return all_responses
 
     def execute_async(self, context: Context) -> None:
+        if self.method.upper() not in ("GET", "HEAD", "OPTIONS"):
+            self.log.warning(
+                "HttpOperator with deferrable=True and method=%s may send 
duplicate requests if the Triggerer restarts.",
+                self.method,
+            )

Review Comment:
   You're right that `HttpTrigger` can't poll for a request that's already in 
flight, and I'm not suggesting we touch the trigger at all. 
   
   The shape I mean is similar to what we did for `FileSensor.execute()`:
   
   
https://github.com/apache/airflow/blob/ce72c02c5a0f4ba2e83465fff7024a344b06ad53/providers/standard/src/airflow/providers/standard/sensors/filesystem.py#L120-L132
   
   It runs the actual operation (`poke`) on the worker first and only calls 
`self.defer()` if there's still something to wait for. `FileTrigger` is 
untouched; the restructuring is entirely in the operator's `execute()`.
   
   The reason `FileSensor` can defer at all is that `poke` is idempotent, so if 
the triggerer restarts and re-runs it, nothing breaks. A 
`POST`/`PUT`/`PATCH`/`DELETE` doesn't have that property, and that's exactly 
the bug: the request lives in `HttpTrigger.run()`, which restarts from scratch 
on a triggerer restart and fires the request a second time.
   
   So the fix is to run non-idempotent requests on the worker (once per task 
try, under normal retry semantics) and only defer the methods that are safe to 
re-run imo:
   
   ```python
   def execute(self, context: Context) -> Any:
       if self.deferrable and self.method.upper() in ("GET", "HEAD", "OPTIONS"):
           self.execute_async(context=context)
       else:
           return self.execute_sync(context=context)
   ```
   
   No trigger changes, just a branch in `execute()`. This is the "long-term 
fix" the issue describes if I am reading it correctly.
   
   On whether it's even possible to make a write safe *in the trigger*: it is, 
but only if the server honors an idempotency key (`Idempotency-Key` style), and 
the key has to be minted on the worker and passed into the trigger's serialized 
kwargs so the replay sends the same one. Without server-side dedup it's the 
exactly-once-delivery problem and can't be solved from the client. Worth adding 
as an opt-in later, but not needed here.
   
   A `log.warning` leaves the duplicate request in place and just narrates the 
risk and I am not even sure if it is useful 🤷 . Running the write on the worker 
removes it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to