Copilot commented on code in PR #64051:
URL: https://github.com/apache/airflow/pull/64051#discussion_r3025331371


##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -82,7 +87,15 @@ def execute(self, context: Context) -> None:
         job_object = 
hook.submit_sync_connection(connection_id=self.connection_id)
         self.job_id = job_object.job_id
         state = job_object.status
+
+        # Derive absolute deadlines for deferrable execution.
+        # execution_timeout is a hard task-level limit (cancels the job),
+        # while timeout only limits how long we wait for the job to finish.
+        # If both are set, the earliest deadline wins.
         end_time = time.time() + self.timeout
+        execution_deadline = None
+        if self.execution_timeout:

Review Comment:
   `if self.execution_timeout:` relies on truthiness; `datetime.timedelta(0)` 
is falsy, which would silently disable the execution deadline even though an 
explicit (zero) execution timeout is set. Use an explicit `is not None` check 
instead.
   ```suggestion
           if self.execution_timeout is not None:
   ```



##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -125,8 +139,32 @@ def execute_complete(self, context: Context, event: Any = 
None) -> None:
         Relies on trigger to throw an exception, otherwise it assumes 
execution was
         successful.
         """
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, 
api_version=self.api_version)
         if event["status"] == "error":
             self.log.debug("Error occurred with context: %s", context)
+            raise RuntimeError(event["message"])
+
+        job_id = event.get("job_id")
+        if event["status"] == "timeout":
+            if job_id:
+                self.log.info("Cancelling Airbyte job %s due to execution 
timeout", job_id)
+                try:
+                    hook.cancel_job(job_id=job_id)
+                except Exception as e:
+                    self.log.warning("Failed to cancel Airbyte job %s: %s", 
job_id, e)
+            else:
+                self.log.warning("No job_id found; skipping cancellation")
+
+            raise RuntimeError(event["message"])
+
+        job_id = event.get("job_id")
+        if event["status"] == "timeout":
+            if job_id:
+                self.log.info("Cancelling Airbyte job %s due to execution 
timeout", job_id)
+                hook.cancel_job(job_id=job_id)
+            else:
+                self.log.warning("No job_id found; skipping cancellation")
+
             raise AirflowException(event["message"])
 

Review Comment:
   There are two duplicated `if event[\"status\"] == \"timeout\"` blocks. The 
second one is unreachable because the first block always raises, and the two 
blocks also disagree on which exception type to raise. Remove the duplicate 
block and keep a single timeout-handling path.
   ```suggestion
   
   ```



##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -125,8 +139,32 @@ def execute_complete(self, context: Context, event: Any = 
None) -> None:
         Relies on trigger to throw an exception, otherwise it assumes 
execution was
         successful.
         """
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, 
api_version=self.api_version)
         if event["status"] == "error":
             self.log.debug("Error occurred with context: %s", context)
+            raise RuntimeError(event["message"])
+
+        job_id = event.get("job_id")
+        if event["status"] == "timeout":
+            if job_id:
+                self.log.info("Cancelling Airbyte job %s due to execution 
timeout", job_id)
+                try:
+                    hook.cancel_job(job_id=job_id)
+                except Exception as e:
+                    self.log.warning("Failed to cancel Airbyte job %s: %s", 
job_id, e)
+            else:
+                self.log.warning("No job_id found; skipping cancellation")
+
+            raise RuntimeError(event["message"])

Review Comment:
   In Airflow operators, raising `RuntimeError` makes it harder to classify 
failures consistently (and conflicts with the docstring note about using more 
specific Airflow exceptions). For timeouts, prefer `AirflowTaskTimeout` (or 
another appropriate Airflow exception type) so the failure is reported as a 
task timeout; for trigger-reported errors, prefer 
`AirflowException`/`AirflowFailException`. This will require updating the new 
tests that currently assert `RuntimeError`.



##########
providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py:
##########
@@ -70,6 +75,17 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
         hook = AirbyteHook(airbyte_conn_id=self.conn_id)
         try:
             while await self.is_still_running(hook):
+                if self.execution_deadline is not None:
+                    if self.execution_deadline < time.time():
+                        yield TriggerEvent(
+                            {
+                                "status": "timeout",
+                                "message": f"Job run {self.job_id} has reached 
execution timeout.",
+                                "job_id": self.job_id,
+                            }
+                        )
+                        return
+
                 if self.end_time < time.time():
                     yield TriggerEvent(

Review Comment:
   The execution deadline is only checked while `is_still_running()` is True. 
If the trigger first observes the job in a terminal state after the deadline 
has elapsed (e.g., triggerer backlog), it will skip the deadline check and can 
incorrectly emit `success`/`error` instead of `timeout`. Consider restructuring 
the loop so the deadline check happens before/independent of 
`is_still_running()` (and also before yielding terminal-status events). Also, 
using `<=` instead of `<` for deadline comparisons avoids a boundary-condition 
delay until the next poll.



##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -125,8 +139,32 @@ def execute_complete(self, context: Context, event: Any = 
None) -> None:
         Relies on trigger to throw an exception, otherwise it assumes 
execution was
         successful.
         """
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, 
api_version=self.api_version)
         if event["status"] == "error":
             self.log.debug("Error occurred with context: %s", context)
+            raise RuntimeError(event["message"])

Review Comment:
   In Airflow operators, raising `RuntimeError` makes it harder to classify 
failures consistently (and conflicts with the docstring note about using more 
specific Airflow exceptions). For timeouts, prefer `AirflowTaskTimeout` (or 
another appropriate Airflow exception type) so the failure is reported as a 
task timeout; for trigger-reported errors, prefer 
`AirflowException`/`AirflowFailException`. This will require updating the new 
tests that currently assert `RuntimeError`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to