pankajastro commented on code in PR #44279:
URL: https://github.com/apache/airflow/pull/44279#discussion_r1862715519


##########
providers/src/airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -2592,8 +2592,15 @@ def _submit_job(
             nowait=True,
         )
 
-    @staticmethod
-    def _handle_job_error(job: BigQueryJob | UnknownJob) -> None:
+    def _handle_job_error(self, job: BigQueryJob | UnknownJob) -> None:
+        self.log.info("Job %s is completed. Checking the job status", 
self.job_id)
+        # I've noticed that sometimes BigQuery jobs transiently report the 
wrong status, causing the Airflow job to be incorrectly marked as successful.
+        # To avoid this, we refresh the job properties before checking the 
final state and handling any errors.
+        while job.state != "DONE":

Review Comment:
   Hi @eladkal,
   
   Thank you so much for your review and feedback; I really appreciate it. I 
understand your concerns regarding the handling of third-party library issues 
in Airflow, and I agree that addressing it in this manner might not be the 
ideal approach.
   
   The issue we encountered is quite unusual. We experienced it in production, 
where the job ultimately timed out after 6 hours, but the `job.result()` method 
returned before the job had actually finished.
   
   I suspect this could be related to network issues or temporary service 
disruptions that might have caused the j`ob.result()` to return prematurely. 
   
   Additionally, I feel that checking `job.error_result` may not always be 
sufficient to detect all errors. As noted in the BigQuery documentation, 
error_result may not be present in every error response from BigQuery.
   https://cloud.google.com/bigquery/docs/error-messages
   
   As for the opening issue in the upstream library, I'm hesitant to pursue it 
at this stage. I don't have a consistent way to reproduce it, nor enough 
evidence to definitively say that the problem lies with the BigQuery client or 
BigQuery service itself.
   
   To address the error handling, would it make sense to modify the 
`_handle_job_error` method to log any errors in `job.errors` and fail the job 
if `job.state != "DONE"`? Below is the proposed update:
   
   ```python
       def _handle_job_error(self, job: BigQueryJob | UnknownJob) -> None:
           self.log.info("Job %s is completed. Checking the job status", 
self.job_id)
           # Log any transient errors encountered during the job execution
           for error in job.errors or []:
               self.log.error("BigQuery Job Error: %s", error)
           if job.error_result:
               raise AirflowException(f"BigQuery job {job.job_id} failed: 
{job.error_result}")
           # Check the final state.
           if job.state != "DONE":
               raise AirflowException(f"Job failed with state: {job.state}")
   ``` 
   I would really appreciate your thoughts on whether this approach makes 
sense, or if you have any further suggestions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to