Copilot commented on code in PR #64051:
URL: https://github.com/apache/airflow/pull/64051#discussion_r3066494627


##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -125,8 +139,32 @@ def execute_complete(self, context: Context, event: Any = 
None) -> None:
         Relies on trigger to throw an exception, otherwise it assumes 
execution was
         successful.
         """
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, 
api_version=self.api_version)
         if event["status"] == "error":
             self.log.debug("Error occurred with context: %s", context)
+            raise RuntimeError(event["message"])
+
+        job_id = event.get("job_id")
+        if event["status"] == "timeout":
+            if job_id:
+                self.log.info("Cancelling Airbyte job %s due to execution 
timeout", job_id)
+                try:
+                    hook.cancel_job(job_id=job_id)
+                except Exception as e:
+                    self.log.warning("Failed to cancel Airbyte job %s: %s", 
job_id, e)
+            else:
+                self.log.warning("No job_id found; skipping cancellation")
+
+            raise RuntimeError(event["message"])
+
+        job_id = event.get("job_id")
+        if event["status"] == "timeout":
+            if job_id:
+                self.log.info("Cancelling Airbyte job %s due to execution 
timeout", job_id)
+                hook.cancel_job(job_id=job_id)
+            else:
+                self.log.warning("No job_id found; skipping cancellation")
+

Review Comment:
   The `timeout` handling block is duplicated (lines 147–158 and 160–168). The 
second block is unreachable because the first one always raises, and it also 
changes behavior (no try/except + different exception type). Consolidate into a 
single `timeout` branch (with the intended best-effort cancellation semantics) 
and raise one consistent Airflow exception type for timeouts/errors.
   ```suggestion
   
   ```



##########
providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py:
##########
@@ -38,6 +38,8 @@ class AirbyteSyncTrigger(BaseTrigger):
     :param job_id: The ID of an Airbyte Sync job.
     :param end_time: Time in seconds to wait for a job run to reach a terminal 
status. Defaults to 7 days.

Review Comment:
   `end_time` is documented as a duration ('Time in seconds to wait') but the 
trigger logic compares it to `time.time()`, implying it is an absolute 
timestamp (deadline). Since this PR introduces another absolute deadline 
(`execution_deadline`), it’s important to clarify `end_time` as an absolute 
timestamp too (or rename/rework it) to avoid API/documentation mismatch.
   ```suggestion
       :param end_time: Absolute timestamp (in seconds since the epoch) by 
which the job run
           must reach a terminal status. Defaults to 7 days from the trigger 
start time.
   ```



##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -82,7 +87,15 @@ def execute(self, context: Context) -> None:
         job_object = 
hook.submit_sync_connection(connection_id=self.connection_id)
         self.job_id = job_object.job_id
         state = job_object.status
+
+        # Derive absolute deadlines for deferrable execution.
+        # execution_timeout is a hard task-level limit (cancels the job),
+        # while timeout only limits how long we wait for the job to finish.
+        # If both are set, the earliest deadline wins.
         end_time = time.time() + self.timeout
+        execution_deadline = None
+        if self.execution_timeout:
+            execution_deadline = time.time() + 
self.execution_timeout.total_seconds()

Review Comment:
   `if self.execution_timeout:` relies on truthiness and will skip deadline 
computation for `timedelta(0)` (which is falsy), even though a zero execution 
timeout should be enforced. Prefer an explicit `is not None` check, and 
consider capturing `now = time.time()` once to avoid drift between `end_time` 
and `execution_deadline` computations.
   ```suggestion
           now = time.time()
           end_time = now + self.timeout
           execution_deadline = None
           if self.execution_timeout is not None:
               execution_deadline = now + self.execution_timeout.total_seconds()
   ```



##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -125,8 +139,32 @@ def execute_complete(self, context: Context, event: Any = 
None) -> None:
         Relies on trigger to throw an exception, otherwise it assumes 
execution was
         successful.
         """
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, 
api_version=self.api_version)
         if event["status"] == "error":
             self.log.debug("Error occurred with context: %s", context)
+            raise RuntimeError(event["message"])

Review Comment:
   Raising `RuntimeError` from an operator callback is inconsistent with 
typical Airflow operator semantics and makes it harder to handle failures 
uniformly. Prefer raising an Airflow exception type (e.g., `AirflowException` 
for generic errors and `AirflowTaskTimeout` for execution timeouts), and update 
the new unit tests accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to