jaketf commented on a change in pull request #9590:
URL: https://github.com/apache/airflow/pull/9590#discussion_r452971061



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -1692,32 +1693,48 @@ def prepare_template(self) -> None:
             with open(self.configuration, 'r') as file:
                 self.configuration = json.loads(file.read())
 
+    def _submit_job(self, hook: BigQueryHook, job_id: str):
+        # Submit a new job
+        job = hook.insert_job(
+            configuration=self.configuration,
+            project_id=self.project_id,
+            location=self.location,
+            job_id=job_id,
+        )
+        # Start the job and wait for it to complete and get the result.
+        job.result()
+        return job
+
     def execute(self, context: Any):
         hook = BigQueryHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
         )
 
-        job_id = self.job_id or f"airflow_{self.task_id}_{int(time())}"
+        exec_date = re.sub("\:|\-|\+", "_", 
context['execution_date'].isoformat())
+        job_id = self.job_id or 
f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_"
+
         try:
-            job = hook.insert_job(
-                configuration=self.configuration,
-                project_id=self.project_id,
-                location=self.location,
-                job_id=job_id,
-            )
-            # Start the job and wait for it to complete and get the result.
-            job.result()
+            # Submit a new job
+            job = self._submit_job(hook, job_id)
         except Conflict:
+            # If the job already exists retrieve it

Review comment:
       In short, the underlying table / partition could have changed between 
the original run and the new run and in many cases the user will want to ensure 
that the result of this job is based on a fresh run of the query.
   
   Airflow cannot know if the underlying table(s) have changed since the 
original run.
   
   To give an example use case:
   Imagine a streaming job in charge of inserting records to an hourly 
partitioned fact table and a DAG responsible for running some hourly analytics 
query (joins to dimension tables / aggregation). Once the processing time 
watermark passes the end of the partition interval original scheduled dag run 
runs (BigQueryInsertJobOperator task runs a query on the data at this 
processing time). Then late data (event time << processing time) arrives that 
should still go to the old partition (because partitioning is on event time). 
When the streaming job detects this it could use some mechanism (e.g. the REST 
API or an alert to a human who manually re-runs the dag for an execution date) 
to tell the DAG (and this BigQueryInsertJobOperator) to re-run because the 
original run for this interval / partition was actually against incomplete 
data. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to