hkc-8010 commented on code in PR #66911:
URL: https://github.com/apache/airflow/pull/66911#discussion_r3266708428


##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -869,22 +869,104 @@ def trigger(
             run_after=run_after,
         )
 
+        dag_run_exists_before_trigger = self._dag_run_exists(dag_id=dag_id, 
run_id=run_id)
+        if dag_run_exists_before_trigger is True:
+            if reset_dag_run:
+                log.info("Dag Run already exists; Resetting Dag Run.", 
dag_id=dag_id, run_id=run_id)
+                return self.clear(run_id=run_id, dag_id=dag_id)
+            log.info("Dag Run already exists!", dag_id=dag_id, run_id=run_id)
+            return ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS)
+
         try:
-            self.client.post(
-                f"dag-runs/{dag_id}/{run_id}", 
content=body.model_dump_json(exclude_defaults=True)
+            self.client._request_without_retry(
+                "POST", f"dag-runs/{dag_id}/{run_id}", 
content=body.model_dump_json(exclude_defaults=True)
             )
+        except (httpx.ReadError, httpx.ReadTimeout, httpx.RemoteProtocolError):
+            if (
+                dag_run_exists_before_trigger is False
+                and self._dag_run_exists(dag_id=dag_id, run_id=run_id, 
retry=True) is True
+            ):
+                log.info(
+                    "Dag Run exists after ambiguous trigger response; treating 
trigger as successful.",
+                    dag_id=dag_id,
+                    run_id=run_id,
+                )
+                return OKResponse(ok=True)
+            raise
         except ServerResponseError as e:
             if e.response.status_code == HTTPStatus.CONFLICT:
                 if reset_dag_run:
-                    log.info("Dag Run already exists; Resetting Dag Run.", 
dag_id=dag_id, run_id=run_id)
+                    log.info(
+                        "Dag Run already exists after trigger attempt; 
Resetting Dag Run.",
+                        detail=e.detail,
+                        dag_id=dag_id,
+                        run_id=run_id,
+                    )
                     return self.clear(run_id=run_id, dag_id=dag_id)
-
-                log.info("Dag Run already exists!", detail=e.detail, 
dag_id=dag_id, run_id=run_id)
+                log.info(
+                    "Dag Run already exists after trigger attempt.",
+                    detail=e.detail,
+                    dag_id=dag_id,
+                    run_id=run_id,
+                )
                 return ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS)
             raise
 
         return OKResponse(ok=True)
 
+    def _dag_run_exists(self, dag_id: str, run_id: str, *, retry: bool = 
False) -> bool | None:
+        """Return whether the Dag run exists, or None when the detail endpoint 
is unavailable."""
+        if self.client._dry_run:
+            return None
+
+        try:
+            if retry:
+                self.client.get(f"dag-runs/{dag_id}/{run_id}")
+            else:
+                self.client._request_without_retry("GET", 
f"dag-runs/{dag_id}/{run_id}")
+        except httpx.RequestError:
+            return None
+        except ServerResponseError as e:
+            if e.response.status_code == HTTPStatus.NOT_FOUND:
+                return False
+            if e.response.status_code == HTTPStatus.METHOD_NOT_ALLOWED:
+                # Older execution API servers may not support the Dag run 
detail endpoint yet.
+                return None
+            if (
+                e.response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY
+                and run_id == "previous"
+                and self._is_legacy_previous_dag_run_route_response(e.response)
+            ):
+                return None
+            if e.response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
+                return None
+            raise
+        except httpx.HTTPStatusError as e:
+            if e.response.status_code == HTTPStatus.NOT_FOUND:
+                return False
+            if e.response.status_code == HTTPStatus.METHOD_NOT_ALLOWED:
+                return None
+            if e.response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
+                return None
+            raise
+        return True

Review Comment:
   Done in 8a9259d5c0f. Replaced _dag_run_exists entirely with 
self.get_count(dag_id=dag_id, run_ids=[run_id]). The pre-check and the 
follow-up ambiguous-POST probe both now use the dag-runs/count endpoint.



##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -869,22 +869,104 @@ def trigger(
             run_after=run_after,
         )
 
+        dag_run_exists_before_trigger = self._dag_run_exists(dag_id=dag_id, 
run_id=run_id)

Review Comment:
   Done in 8a9259d5c0f. Both the pre-check (line 873) and the follow-up probe 
after an ambiguous POST (line 888) now call self.get_count(dag_id=dag_id, 
run_ids=[run_id]) directly.



##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -869,22 +869,104 @@ def trigger(
             run_after=run_after,
         )
 
+        dag_run_exists_before_trigger = self._dag_run_exists(dag_id=dag_id, 
run_id=run_id)
+        if dag_run_exists_before_trigger is True:
+            if reset_dag_run:
+                log.info("Dag Run already exists; Resetting Dag Run.", 
dag_id=dag_id, run_id=run_id)
+                return self.clear(run_id=run_id, dag_id=dag_id)

Review Comment:
   Added a TODO at the clear() call site in trigger() noting this as a 
follow-up. Happy to open a tracking issue if that would help.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to