jason810496 commented on code in PR #66911:
URL: https://github.com/apache/airflow/pull/66911#discussion_r3265070898


##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -869,22 +869,104 @@ def trigger(
             run_after=run_after,
         )
 
+        dag_run_exists_before_trigger = self._dag_run_exists(dag_id=dag_id, 
run_id=run_id)
+        if dag_run_exists_before_trigger is True:
+            if reset_dag_run:
+                log.info("Dag Run already exists; Resetting Dag Run.", 
dag_id=dag_id, run_id=run_id)
+                return self.clear(run_id=run_id, dag_id=dag_id)
+            log.info("Dag Run already exists!", dag_id=dag_id, run_id=run_id)
+            return ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS)
+
         try:
-            self.client.post(
-                f"dag-runs/{dag_id}/{run_id}", 
content=body.model_dump_json(exclude_defaults=True)
+            self.client._request_without_retry(
+                "POST", f"dag-runs/{dag_id}/{run_id}", 
content=body.model_dump_json(exclude_defaults=True)
             )
+        except (httpx.ReadError, httpx.ReadTimeout, httpx.RemoteProtocolError):
+            if (
+                dag_run_exists_before_trigger is False
+                and self._dag_run_exists(dag_id=dag_id, run_id=run_id, 
retry=True) is True
+            ):
+                log.info(
+                    "Dag Run exists after ambiguous trigger response; treating 
trigger as successful.",
+                    dag_id=dag_id,
+                    run_id=run_id,
+                )
+                return OKResponse(ok=True)
+            raise
         except ServerResponseError as e:
             if e.response.status_code == HTTPStatus.CONFLICT:
                 if reset_dag_run:
-                    log.info("Dag Run already exists; Resetting Dag Run.", 
dag_id=dag_id, run_id=run_id)
+                    log.info(
+                        "Dag Run already exists after trigger attempt; 
Resetting Dag Run.",
+                        detail=e.detail,
+                        dag_id=dag_id,
+                        run_id=run_id,
+                    )
                     return self.clear(run_id=run_id, dag_id=dag_id)
-
-                log.info("Dag Run already exists!", detail=e.detail, 
dag_id=dag_id, run_id=run_id)
+                log.info(
+                    "Dag Run already exists after trigger attempt.",
+                    detail=e.detail,
+                    dag_id=dag_id,
+                    run_id=run_id,
+                )
                 return ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS)
             raise
 
         return OKResponse(ok=True)
 
+    def _dag_run_exists(self, dag_id: str, run_id: str, *, retry: bool = 
False) -> bool | None:
+        """Return whether the Dag run exists, or None when the detail endpoint 
is unavailable."""
+        if self.client._dry_run:
+            return None
+
+        try:
+            if retry:
+                self.client.get(f"dag-runs/{dag_id}/{run_id}")
+            else:
+                self.client._request_without_retry("GET", 
f"dag-runs/{dag_id}/{run_id}")
+        except httpx.RequestError:
+            return None
+        except ServerResponseError as e:
+            if e.response.status_code == HTTPStatus.NOT_FOUND:
+                return False
+            if e.response.status_code == HTTPStatus.METHOD_NOT_ALLOWED:
+                # Older execution API servers may not support the Dag run 
detail endpoint yet.
+                return None
+            if (
+                e.response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY
+                and run_id == "previous"
+                and self._is_legacy_previous_dag_run_route_response(e.response)
+            ):
+                return None
+            if e.response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
+                return None
+            raise
+        except httpx.HTTPStatusError as e:
+            if e.response.status_code == HTTPStatus.NOT_FOUND:
+                return False
+            if e.response.status_code == HTTPStatus.METHOD_NOT_ALLOWED:
+                return None
+            if e.response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
+                return None
+            raise
+        return True

Review Comment:
   I will prefer to use `get_dr_count` (pass with the `run_id` paramter) to 
make sure whether the DagRun existed or not.
   We could document down the history context here then we don't need to handle 
complex the compatible logic here.
   
   Since `get_dr_count` was introduced since the first Airflow 3.0 release.
   
https://github.com/apache/airflow/blob/v3-0-stable/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py#L160-L181



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to