albertocalderari commented on a change in pull request #9590:
URL: https://github.com/apache/airflow/pull/9590#discussion_r448613697



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -1692,32 +1692,52 @@ def prepare_template(self) -> None:
             with open(self.configuration, 'r') as file:
                 self.configuration = json.loads(file.read())
 
+    def _submit_job(self, hook: BigQueryHook, job_id: str):
+        # Submit a new job
+        job = hook.insert_job(
+            configuration=self.configuration,
+            project_id=self.project_id,
+            location=self.location,
+            job_id=job_id,
+        )
+        # Start the job and wait for it to complete and get the result.
+        job.result()
+        return job
+
     def execute(self, context: Any):
         hook = BigQueryHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
         )
 
-        job_id = self.job_id or f"airflow_{self.task_id}_{int(time())}"
+        exec_date = context['execution_date'].isoformat()
+        job_id = self.job_id or 
f"airflow_{self.dag_id}_{self.task_id}_{exec_date}"
+
         try:
-            job = hook.insert_job(
-                configuration=self.configuration,
-                project_id=self.project_id,
-                location=self.location,
-                job_id=job_id,
-            )
-            # Start the job and wait for it to complete and get the result.
-            job.result()
+            # Submit a new job
+            job = self._submit_job(hook, job_id)
         except Conflict:
+            # If the job already exists retrieve it
             job = hook.get_job(
                 project_id=self.project_id,
                 location=self.location,
                 job_id=job_id,
             )
-            # Get existing job and wait for it to be ready
-            for time_to_wait in exponential_sleep_generator(initial=10, 
maximum=120):
-                sleep(time_to_wait)
-                job.reload()
-                if job.done():
-                    break
+
+            if job.done() and job.error_result:
+                # The job exists and finished with an error and we are 
probably reruning it
+                # So we have to make a new job_id because it has to be unique
+                job_id = f"{self.job_id}_{int(time())}"
+                job = self._submit_job(hook, job_id)
+            elif not job.done():
+                # The job is still running so wait for it to be ready
+                for time_to_wait in exponential_sleep_generator(initial=10, 
maximum=120):

Review comment:
       If you use the Google client you won't need to sleep and wait - the loop 
is built into the job itself




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to