albertocalderari commented on a change in pull request #9590:
URL: https://github.com/apache/airflow/pull/9590#discussion_r449076100



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -1692,32 +1692,52 @@ def prepare_template(self) -> None:
             with open(self.configuration, 'r') as file:
                 self.configuration = json.loads(file.read())
 
+    def _submit_job(self, hook: BigQueryHook, job_id: str):
+        # Submit a new job
+        job = hook.insert_job(
+            configuration=self.configuration,
+            project_id=self.project_id,
+            location=self.location,
+            job_id=job_id,
+        )
+        # Start the job and wait for it to complete and get the result.
+        job.result()
+        return job
+
     def execute(self, context: Any):
         hook = BigQueryHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
         )
 
-        job_id = self.job_id or f"airflow_{self.task_id}_{int(time())}"
+        exec_date = context['execution_date'].isoformat()
+        job_id = self.job_id or 
f"airflow_{self.dag_id}_{self.task_id}_{exec_date}"
+
         try:
-            job = hook.insert_job(
-                configuration=self.configuration,
-                project_id=self.project_id,
-                location=self.location,
-                job_id=job_id,
-            )
-            # Start the job and wait for it to complete and get the result.
-            job.result()
+            # Submit a new job
+            job = self._submit_job(hook, job_id)
         except Conflict:
+            # If the job already exists retrieve it
             job = hook.get_job(
                 project_id=self.project_id,
                 location=self.location,
                 job_id=job_id,
             )
-            # Get existing job and wait for it to be ready
-            for time_to_wait in exponential_sleep_generator(initial=10, 
maximum=120):
-                sleep(time_to_wait)
-                job.reload()
-                if job.done():
-                    break
+
+            if job.done() and job.error_result:
+                # The job exists and finished with an error and we are 
probably reruning it
+                # So we have to make a new job_id because it has to be unique
+                job_id = f"{self.job_id}_{int(time())}"

Review comment:
       I see - yet you won't be able to re-poll for this job since it uses a 
the current time, which is not reproducible on an "eventual" next run.
   Though much better than what is now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to