Copilot commented on code in PR #64051:
URL: https://github.com/apache/airflow/pull/64051#discussion_r3025331371
##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -82,7 +87,15 @@ def execute(self, context: Context) -> None:
job_object =
hook.submit_sync_connection(connection_id=self.connection_id)
self.job_id = job_object.job_id
state = job_object.status
+
+ # Derive absolute deadlines for deferrable execution.
+ # execution_timeout is a hard task-level limit (cancels the job),
+ # while timeout only limits how long we wait for the job to finish.
+ # If both are set, the earliest deadline wins.
end_time = time.time() + self.timeout
+ execution_deadline = None
+ if self.execution_timeout:
Review Comment:
`if self.execution_timeout:` relies on truthiness; `datetime.timedelta(0)`
is falsy, which would silently disable the execution deadline even though an
explicit (zero) execution timeout is set. Use an explicit `is not None` check
instead.
```suggestion
if self.execution_timeout is not None:
```
##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -125,8 +139,32 @@ def execute_complete(self, context: Context, event: Any =
None) -> None:
Relies on trigger to throw an exception, otherwise it assumes
execution was
successful.
"""
+ hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version)
if event["status"] == "error":
self.log.debug("Error occurred with context: %s", context)
+ raise RuntimeError(event["message"])
+
+ job_id = event.get("job_id")
+ if event["status"] == "timeout":
+ if job_id:
+ self.log.info("Cancelling Airbyte job %s due to execution
timeout", job_id)
+ try:
+ hook.cancel_job(job_id=job_id)
+ except Exception as e:
+ self.log.warning("Failed to cancel Airbyte job %s: %s",
job_id, e)
+ else:
+ self.log.warning("No job_id found; skipping cancellation")
+
+ raise RuntimeError(event["message"])
+
+ job_id = event.get("job_id")
+ if event["status"] == "timeout":
+ if job_id:
+ self.log.info("Cancelling Airbyte job %s due to execution
timeout", job_id)
+ hook.cancel_job(job_id=job_id)
+ else:
+ self.log.warning("No job_id found; skipping cancellation")
+
raise AirflowException(event["message"])
Review Comment:
There are two duplicated `if event[\"status\"] == \"timeout\"` blocks. The
second one is unreachable because the first block always raises, and the two
blocks also disagree on which exception type to raise. Remove the duplicate
block and keep a single timeout-handling path.
```suggestion
```
##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -125,8 +139,32 @@ def execute_complete(self, context: Context, event: Any =
None) -> None:
Relies on trigger to throw an exception, otherwise it assumes
execution was
successful.
"""
+ hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version)
if event["status"] == "error":
self.log.debug("Error occurred with context: %s", context)
+ raise RuntimeError(event["message"])
+
+ job_id = event.get("job_id")
+ if event["status"] == "timeout":
+ if job_id:
+ self.log.info("Cancelling Airbyte job %s due to execution
timeout", job_id)
+ try:
+ hook.cancel_job(job_id=job_id)
+ except Exception as e:
+ self.log.warning("Failed to cancel Airbyte job %s: %s",
job_id, e)
+ else:
+ self.log.warning("No job_id found; skipping cancellation")
+
+ raise RuntimeError(event["message"])
Review Comment:
In Airflow operators, raising `RuntimeError` makes it harder to classify
failures consistently (and conflicts with the docstring note about using more
specific Airflow exceptions). For timeouts, prefer `AirflowTaskTimeout` (or
another appropriate Airflow exception type) so the failure is reported as a
task timeout; for trigger-reported errors, prefer
`AirflowException`/`AirflowFailException`. This will require updating the new
tests that currently assert `RuntimeError`.
##########
providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py:
##########
@@ -70,6 +75,17 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
hook = AirbyteHook(airbyte_conn_id=self.conn_id)
try:
while await self.is_still_running(hook):
+ if self.execution_deadline is not None:
+ if self.execution_deadline < time.time():
+ yield TriggerEvent(
+ {
+ "status": "timeout",
+ "message": f"Job run {self.job_id} has reached
execution timeout.",
+ "job_id": self.job_id,
+ }
+ )
+ return
+
if self.end_time < time.time():
yield TriggerEvent(
Review Comment:
The execution deadline is only checked while `is_still_running()` is True.
If the trigger first observes the job in a terminal state after the deadline
has elapsed (e.g., triggerer backlog), it will skip the deadline check and can
incorrectly emit `success`/`error` instead of `timeout`. Consider restructuring
the loop so the deadline check happens before/independent of
`is_still_running()` (and also before yielding terminal-status events). Also,
using `<=` instead of `<` for deadline comparisons avoids a boundary-condition
delay until the next poll.
##########
providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py:
##########
@@ -125,8 +139,32 @@ def execute_complete(self, context: Context, event: Any =
None) -> None:
Relies on trigger to throw an exception, otherwise it assumes
execution was
successful.
"""
+ hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id,
api_version=self.api_version)
if event["status"] == "error":
self.log.debug("Error occurred with context: %s", context)
+ raise RuntimeError(event["message"])
Review Comment:
In Airflow operators, raising `RuntimeError` makes it harder to classify
failures consistently (and conflicts with the docstring note about using more
specific Airflow exceptions). For timeouts, prefer `AirflowTaskTimeout` (or
another appropriate Airflow exception type) so the failure is reported as a
task timeout; for trigger-reported errors, prefer
`AirflowException`/`AirflowFailException`. This will require updating the new
tests that currently assert `RuntimeError`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]