This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 57ca63dd9ec Add new retry logic to BigQueryInsertJobOperator (#63181)
57ca63dd9ec is described below
commit 57ca63dd9ec7cdbb2317adf4015ab1fe909e7d14
Author: Nitochkin <[email protected]>
AuthorDate: Thu Mar 12 01:45:50 2026 +0100
Add new retry logic to BigQueryInsertJobOperator (#63181)
Co-authored-by: Anton Nitochkin <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
---
.../providers/google/cloud/hooks/bigquery.py | 3 +
.../providers/google/cloud/operators/bigquery.py | 68 +++++++++---
.../tests/unit/google/cloud/hooks/test_bigquery.py | 31 ++++++
.../unit/google/cloud/operators/test_bigquery.py | 119 +++++++++++++++++++++
4 files changed, 208 insertions(+), 13 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index c5b395b2fab..b5e033bcdd8 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -1337,6 +1337,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
configuration: dict,
run_after: pendulum.DateTime | datetime | None = None,
force_rerun: bool = False,
+ try_number: int | None = None,
) -> str:
if force_rerun:
hash_base = str(uuid.uuid4())
@@ -1362,6 +1363,8 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
job_id_timestamp = pendulum.now("UTC")
job_id =
f"airflow_{dag_id}_{task_id}_{job_id_timestamp.isoformat()}_{uniqueness_suffix}"
+ if try_number:
+ job_id += f"_{try_number}"
return re.sub(r"[:\-+.]", "_", job_id)
def get_run_after_or_logical_date(self, context: Context) ->
pendulum.DateTime | datetime | None:
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
index f773d56f77a..ab364c9d594 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -2382,6 +2382,25 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator,
_BigQueryInsertJobOpera
if job.state != "DONE":
raise AirflowException(f"Job failed with state: {job.state}")
+ def _submit_new_job_on_retry(
+ self,
+ context: Any,
+ hook: BigQueryHook,
+ ) -> BigQueryJob | UnknownJob:
+ self.log.info("Job retry attempt, try_number is: %s",
context["ti"].try_number)
+ self.job_id = hook.generate_job_id(
+ job_id=None,
+ dag_id=self.dag_id,
+ task_id=self.task_id,
+ logical_date=None,
+ configuration=self.configuration,
+ run_after=hook.get_run_after_or_logical_date(context),
+ force_rerun=self.force_rerun,
+ try_number=context["ti"].try_number,
+ )
+ job: BigQueryJob | UnknownJob = self._submit_job(hook, self.job_id)
+ return job
+
def execute(self, context: Any):
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
@@ -2391,6 +2410,16 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator,
_BigQueryInsertJobOpera
if self.project_id is None:
self.project_id = hook.project_id
+ # Handles Operator retries when a user does not explicitly set a
job_id.
+ # For example, if a previous job failed due to a 429 "Too Many
Requests" error,
+ # the Operator will retry and resubmit the job. We need to ensure we
don't lose
+ # the ability to reattach to this resubmitted job if Airflow
components fail.
+ # To maintain backward compatibility, the try_number is appended to
the job name
+ # only starting from the 2nd attempt.
+ ti_try_number = None
+ if self.job_id is None and context["ti"].try_number > 2:
+ ti_try_number = context["ti"].try_number - 1
+
self.job_id = hook.generate_job_id(
job_id=self.job_id,
dag_id=self.dag_id,
@@ -2399,6 +2428,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator,
_BigQueryInsertJobOpera
configuration=self.configuration,
run_after=hook.get_run_after_or_logical_date(context),
force_rerun=self.force_rerun,
+ try_number=ti_try_number,
)
try:
@@ -2415,20 +2445,32 @@ class
BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryInsertJobOpera
job_id=self.job_id,
)
- if job.state not in self.reattach_states:
- # Same job configuration, so we need force_rerun
- raise AirflowException(
- f"Job with id: {self.job_id} already exists and is in
{job.state} state. If you "
- f"want to force rerun it consider setting
`force_rerun=True`."
- f"Or, if you want to reattach in this scenario add
{job.state} to `reattach_states`"
- )
+ # This block handles cases where job_id is None and a 429 error
occurs.
+ # A new job_id will be generated because BigQuery does not allow
rerunning an existing job once it reaches
+ # the DONE state. A 429 error can occur because many
BigQueryInsertJobOperators are running in parallel
+ # and hitting quota limits. However, over time, other clients will
finish their jobs, making it
+ # possible to execute a new job.
+ if (
+ job.state == "DONE"
+ and (job.error_result and "429" in job.error_result)
+ and context["ti"].try_number > 1
+ ):
+ job = self._submit_new_job_on_retry(context, hook) # type:
ignore[no-redef]
+ else:
+ if job.state not in self.reattach_states:
+ # Same job configuration, so we need force_rerun
+ raise AirflowException(
+ f"Job with id: {self.job_id} already exists and is in
{job.state} state. If you "
+ f"want to force rerun it consider setting
`force_rerun=True`."
+ f"Or, if you want to reattach in this scenario add
{job.state} to `reattach_states`"
+ )
- # Job already reached state DONE
- if job.state == "DONE":
- raise AirflowException("Job is already in state DONE. Can not
reattach to this job.")
+ # Job already reached state DONE
+ if job.state == "DONE":
+ raise AirflowException("Job is already in state DONE. Can
not reattach to this job.")
- # We are reattaching to a job
- self.log.info("Reattaching to existing Job in state %s", job.state)
+ # We are reattaching to a job
+ self.log.info("Reattaching to existing Job in state %s",
job.state)
job_types = {
LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"],
@@ -2477,8 +2519,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator,
_BigQueryInsertJobOpera
if not self.deferrable:
job.result(timeout=self.result_timeout, retry=self.result_retry)
self._handle_job_error(job)
-
return self.job_id
+
if job.running():
self.defer(
timeout=self.execution_timeout,
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index 6a6fa8b3f3b..e4a65b7f8b9 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -684,6 +684,37 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
)
assert job_id == expected_job_id
+ @mock.patch("airflow.providers.google.cloud.hooks.bigquery.md5")
+ @pytest.mark.parametrize(
+ ("test_dag_id", "expected_job_id", "try_number"),
+ [
+ ("test-dag-id-1.1",
"airflow_test_dag_id_1_1_test_job_id_2020_01_23T00_00_00_hash", None),
+ ("test-dag-id-1.2",
"airflow_test_dag_id_1_2_test_job_id_2020_01_23T00_00_00_hash_2", 2),
+ ("test-dag-id-1.3",
"airflow_test_dag_id_1_3_test_job_id_2020_01_23T00_00_00_hash_5", 5),
+ ],
+ ids=["test-dag-id-1.1", "test-dag-id-1.2", "test-dag-id-1.3"],
+ )
+ def test_job_id_validity_with_try_number(self, mock_md5, test_dag_id,
expected_job_id, try_number):
+ hash_ = "hash"
+ mock_md5.return_value.hexdigest.return_value = hash_
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
+ }
+
+ job_id = self.hook.generate_job_id(
+ job_id=None,
+ dag_id=test_dag_id,
+ task_id="test_job_id",
+ logical_date=None,
+ configuration=configuration,
+ run_after=datetime(2020, 1, 23),
+ try_number=try_number,
+ )
+ assert job_id == expected_job_id
+
def test_get_run_after_or_logical_date(self):
"""Test get_run_after_or_logical_date for both Airflow 3.x and pre-3.0
behavior."""
if AIRFLOW_V_3_0_PLUS:
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
index c1eed574ad0..2655132009b 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
@@ -1592,8 +1592,127 @@ class TestBigQueryInsertJobOperator:
run_after=ANY,
configuration=configuration,
force_rerun=True,
+ try_number=None,
)
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute_generate_job_id_with_ti_try_number(self, mock_hook):
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
+ }
+
+ # Setup context with try_number > 2
+ context = MagicMock()
+ context["ti"].try_number = 3
+
+ mock_hook.return_value.get_run_after_or_logical_date.return_value =
None
+ mock_hook.return_value.generate_job_id.return_value =
"generated_job_id_with_try_number"
+ mock_hook.return_value.insert_job.return_value = MagicMock(
+ state="DONE", job_id="generated_job_id_with_try_number",
error_result=False
+ )
+
+ op = BigQueryInsertJobOperator(
+ task_id="insert_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=None, # job_id must be None to trigger the try_number logic
+ project_id=TEST_GCP_PROJECT_ID,
+ force_rerun=False,
+ )
+
+ op.execute(context=context)
+
+ mock_hook.return_value.generate_job_id.assert_called_once_with(
+ job_id=None,
+ dag_id=op.dag_id,
+ task_id=op.task_id,
+ logical_date=None,
+ configuration=configuration,
+ run_after=None,
+ force_rerun=False,
+ try_number=2,
+ )
+
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute_conflict_429_retry_resubmits_job(self, mock_hook):
+ """
+ Tests that if a Conflict occurs and the retrieved job failed with a
429 error,
+ a new job_id is generated and the job is resubmitted.
+ """
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
+ }
+
+ # Setup context with try_number > 1
+ context = MagicMock()
+ context["ti"].try_number = 2
+
+ mock_hook.return_value.get_run_after_or_logical_date.return_value =
None
+
+ # We need generate_job_id to return two different IDs for the two
attempts
+ first_job_id = "initial_generated_job_id"
+ second_job_id = "retry_generated_job_id"
+ mock_hook.return_value.generate_job_id.side_effect = [first_job_id,
second_job_id]
+
+ # First insert_job raises Conflict, second one succeeds
+ success_job = MagicMock(state="DONE", job_id=second_job_id,
error_result=False)
+ mock_hook.return_value.insert_job.side_effect = [Conflict("already
exists"), success_job]
+
+ # get_job returns a job in DONE state with a 429 error
+ failed_429_job = MagicMock(
+ job_id=first_job_id,
+ state="DONE",
+ error_result="Quota exceeded: Your project exceeded quota for...
HTTP 429 Too Many Requests",
+ )
+ mock_hook.return_value.get_job.return_value = failed_429_job
+
+ op = BigQueryInsertJobOperator(
+ task_id="insert_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=None,
+ project_id=TEST_GCP_PROJECT_ID,
+ force_rerun=False,
+ )
+
+ result = op.execute(context=context)
+
+ # 1. Assert generate_job_id was called twice (initial + retry)
+ assert mock_hook.return_value.generate_job_id.call_count == 2
+
+ # Check the arguments of the second generate_job_id call (the retry)
+ mock_hook.return_value.generate_job_id.assert_called_with(
+ job_id=None,
+ dag_id=op.dag_id,
+ task_id=op.task_id,
+ logical_date=None,
+ configuration=configuration,
+ run_after=None,
+ force_rerun=False,
+ try_number=2, # Should pass context["ti"].try_number exactly
+ )
+
+ # 2. Assert insert_job was called twice (initial submit + resubmit)
+ assert mock_hook.return_value.insert_job.call_count == 2
+ mock_hook.return_value.insert_job.assert_called_with(
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=second_job_id, # Ensure the second call used the newly
generated job_id
+ nowait=True,
+ project_id=TEST_GCP_PROJECT_ID,
+ retry=DEFAULT_RETRY,
+ timeout=None,
+ )
+
+ # 3. Assert the operator returns the final successful job_id
+ assert result == second_job_id
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute_openlineage_events(self, mock_hook):
job_id = "123456"