jaketf commented on a change in pull request #9590:
URL: https://github.com/apache/airflow/pull/9590#discussion_r462599520



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -1692,32 +1693,48 @@ def prepare_template(self) -> None:
             with open(self.configuration, 'r') as file:
                 self.configuration = json.loads(file.read())
 
+    def _submit_job(self, hook: BigQueryHook, job_id: str):
+        # Submit a new job
+        job = hook.insert_job(
+            configuration=self.configuration,
+            project_id=self.project_id,
+            location=self.location,
+            job_id=job_id,
+        )
+        # Start the job and wait for it to complete and get the result.
+        job.result()
+        return job
+
     def execute(self, context: Any):
         hook = BigQueryHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
         )
 
-        job_id = self.job_id or f"airflow_{self.task_id}_{int(time())}"
+        exec_date = re.sub("\:|\-|\+", "_", 
context['execution_date'].isoformat())
+        job_id = self.job_id or 
f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_"
+
         try:
-            job = hook.insert_job(
-                configuration=self.configuration,
-                project_id=self.project_id,
-                location=self.location,
-                job_id=job_id,
-            )
-            # Start the job and wait for it to complete and get the result.
-            job.result()
+            # Submit a new job
+            job = self._submit_job(hook, job_id)
         except Conflict:
+            # If the job already exists retrieve it

Review comment:
       > I think we will not be able to create universal behavior that will 
keep idempotency. Some users want to always execute SQL query, others want to 
use predetermined results, and others want to re-query only when it is 
backfill. All the cases sound correct and they cannot always be combined into a 
single operator without additional parameters.
   
   This is an interesting point. Perhaps we consider bringing back the 
deprecated BigQueryOperator (contract is always runs a new query) and than have 
this BigQueryInsertJobOperator (contract is create job if not succeeded exists).
   
   IMHO we can have both behaviors in a single operator with a 
`force_rerun=True` parameter that controls how the job id is generated.
   ```python3
   # choice of uuid 
   uniqueness_hash = hash(uuid.uuid4()) if self.force_rerun else 
hash(job_config)
   
   job_id = f"{dag_id}{task_id}{exec_date}{uniqueness_hash}"
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to