jaketf commented on a change in pull request #9590: URL: https://github.com/apache/airflow/pull/9590#discussion_r452971061
########## File path: airflow/providers/google/cloud/operators/bigquery.py ########## @@ -1692,32 +1693,48 @@ def prepare_template(self) -> None: with open(self.configuration, 'r') as file: self.configuration = json.loads(file.read()) + def _submit_job(self, hook: BigQueryHook, job_id: str): + # Submit a new job + job = hook.insert_job( + configuration=self.configuration, + project_id=self.project_id, + location=self.location, + job_id=job_id, + ) + # Start the job and wait for it to complete and get the result. + job.result() + return job + def execute(self, context: Any): hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to, ) - job_id = self.job_id or f"airflow_{self.task_id}_{int(time())}" + exec_date = re.sub("\:|\-|\+", "_", context['execution_date'].isoformat()) + job_id = self.job_id or f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_" + try: - job = hook.insert_job( - configuration=self.configuration, - project_id=self.project_id, - location=self.location, - job_id=job_id, - ) - # Start the job and wait for it to complete and get the result. - job.result() + # Submit a new job + job = self._submit_job(hook, job_id) except Conflict: + # If the job already exists retrieve it Review comment: In short, the underlying table / partition could have changed between the original run and the new run and in many cases the user will want to ensure that the result of this job is based on a fresh run of the query. Airflow cannot know if the underlying table(s) have changed since the original run. To give an example use case: Imagine a streaming job in charge of inserting records to an hourly partitioned fact table and a DAG responsible for running some hourly analytics query (joins to dimension tables / aggregation). Once the processing time watermark passes the end of the partition interval original scheduled dag run runs (BigQueryInsertJobOperator task runs a query on the data at this processing time). Then late data (event time << processing time) arrives that should still go to the old partition (because partitioning is on event time). When the streaming job detects this it could use some mechanism (e.g. the REST API or an alert to a human who manually re-runs the dag for an execution date) to tell the DAG (and this BigQueryInsertJobOperator) to re-run because the original run for this interval / partition was actually against incomplete data. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org