This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 57ca63dd9ec Add new retry logic to BigQueryInsertJobOperator (#63181)
57ca63dd9ec is described below

commit 57ca63dd9ec7cdbb2317adf4015ab1fe909e7d14
Author: Nitochkin <[email protected]>
AuthorDate: Thu Mar 12 01:45:50 2026 +0100

    Add new retry logic to BigQueryInsertJobOperator (#63181)
    
    Co-authored-by: Anton Nitochkin <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 .../providers/google/cloud/hooks/bigquery.py       |   3 +
 .../providers/google/cloud/operators/bigquery.py   |  68 +++++++++---
 .../tests/unit/google/cloud/hooks/test_bigquery.py |  31 ++++++
 .../unit/google/cloud/operators/test_bigquery.py   | 119 +++++++++++++++++++++
 4 files changed, 208 insertions(+), 13 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index c5b395b2fab..b5e033bcdd8 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -1337,6 +1337,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         configuration: dict,
         run_after: pendulum.DateTime | datetime | None = None,
         force_rerun: bool = False,
+        try_number: int | None = None,
     ) -> str:
         if force_rerun:
             hash_base = str(uuid.uuid4())
@@ -1362,6 +1363,8 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
             job_id_timestamp = pendulum.now("UTC")
 
         job_id = 
f"airflow_{dag_id}_{task_id}_{job_id_timestamp.isoformat()}_{uniqueness_suffix}"
+        if try_number:
+            job_id += f"_{try_number}"
         return re.sub(r"[:\-+.]", "_", job_id)
 
     def get_run_after_or_logical_date(self, context: Context) -> 
pendulum.DateTime | datetime | None:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
index f773d56f77a..ab364c9d594 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -2382,6 +2382,25 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, 
_BigQueryInsertJobOpera
         if job.state != "DONE":
             raise AirflowException(f"Job failed with state: {job.state}")
 
+    def _submit_new_job_on_retry(
+        self,
+        context: Any,
+        hook: BigQueryHook,
+    ) -> BigQueryJob | UnknownJob:
+        self.log.info("Job retry attempt, try_number is: %s", 
context["ti"].try_number)
+        self.job_id = hook.generate_job_id(
+            job_id=None,
+            dag_id=self.dag_id,
+            task_id=self.task_id,
+            logical_date=None,
+            configuration=self.configuration,
+            run_after=hook.get_run_after_or_logical_date(context),
+            force_rerun=self.force_rerun,
+            try_number=context["ti"].try_number,
+        )
+        job: BigQueryJob | UnknownJob = self._submit_job(hook, self.job_id)
+        return job
+
     def execute(self, context: Any):
         hook = BigQueryHook(
             gcp_conn_id=self.gcp_conn_id,
@@ -2391,6 +2410,16 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, 
_BigQueryInsertJobOpera
         if self.project_id is None:
             self.project_id = hook.project_id
 
+        # Handles Operator retries when a user does not explicitly set a 
job_id.
+        # For example, if a previous job failed due to a 429 "Too Many 
Requests" error,
+        # the Operator will retry and resubmit the job. We need to ensure we 
don't lose
+        # the ability to reattach to this resubmitted job if Airflow 
components fail.
+        # To maintain backward compatibility, the try_number is appended to 
the job name
+        # only starting from the 2nd attempt.
+        ti_try_number = None
+        if self.job_id is None and context["ti"].try_number > 2:
+            ti_try_number = context["ti"].try_number - 1
+
         self.job_id = hook.generate_job_id(
             job_id=self.job_id,
             dag_id=self.dag_id,
@@ -2399,6 +2428,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, 
_BigQueryInsertJobOpera
             configuration=self.configuration,
             run_after=hook.get_run_after_or_logical_date(context),
             force_rerun=self.force_rerun,
+            try_number=ti_try_number,
         )
 
         try:
@@ -2415,20 +2445,32 @@ class 
BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryInsertJobOpera
                 job_id=self.job_id,
             )
 
-            if job.state not in self.reattach_states:
-                # Same job configuration, so we need force_rerun
-                raise AirflowException(
-                    f"Job with id: {self.job_id} already exists and is in 
{job.state} state. If you "
-                    f"want to force rerun it consider setting 
`force_rerun=True`."
-                    f"Or, if you want to reattach in this scenario add 
{job.state} to `reattach_states`"
-                )
+            # This block handles cases where job_id is None and a 429 error 
occurs.
+            # A new job_id will be generated because BigQuery does not allow 
rerunning an existing job once it reaches
+            # the DONE state. A 429 error can occur because many 
BigQueryInsertJobOperators are running in parallel
+            # and hitting quota limits. However, over time, other clients will 
finish their jobs, making it
+            # possible to execute a new job.
+            if (
+                job.state == "DONE"
+                and (job.error_result and "429" in job.error_result)
+                and context["ti"].try_number > 1
+            ):
+                job = self._submit_new_job_on_retry(context, hook)  # type: 
ignore[no-redef]
+            else:
+                if job.state not in self.reattach_states:
+                    # Same job configuration, so we need force_rerun
+                    raise AirflowException(
+                        f"Job with id: {self.job_id} already exists and is in 
{job.state} state. If you "
+                        f"want to force rerun it consider setting 
`force_rerun=True`."
+                        f"Or, if you want to reattach in this scenario add 
{job.state} to `reattach_states`"
+                    )
 
-            # Job already reached state DONE
-            if job.state == "DONE":
-                raise AirflowException("Job is already in state DONE. Can not 
reattach to this job.")
+                # Job already reached state DONE
+                if job.state == "DONE":
+                    raise AirflowException("Job is already in state DONE. Can 
not reattach to this job.")
 
-            # We are reattaching to a job
-            self.log.info("Reattaching to existing Job in state %s", job.state)
+                # We are reattaching to a job
+                self.log.info("Reattaching to existing Job in state %s", 
job.state)
 
         job_types = {
             LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"],
@@ -2477,8 +2519,8 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, 
_BigQueryInsertJobOpera
         if not self.deferrable:
             job.result(timeout=self.result_timeout, retry=self.result_retry)
             self._handle_job_error(job)
-
             return self.job_id
+
         if job.running():
             self.defer(
                 timeout=self.execution_timeout,
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index 6a6fa8b3f3b..e4a65b7f8b9 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -684,6 +684,37 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
         )
         assert job_id == expected_job_id
 
+    @mock.patch("airflow.providers.google.cloud.hooks.bigquery.md5")
+    @pytest.mark.parametrize(
+        ("test_dag_id", "expected_job_id", "try_number"),
+        [
+            ("test-dag-id-1.1", 
"airflow_test_dag_id_1_1_test_job_id_2020_01_23T00_00_00_hash", None),
+            ("test-dag-id-1.2", 
"airflow_test_dag_id_1_2_test_job_id_2020_01_23T00_00_00_hash_2", 2),
+            ("test-dag-id-1.3", 
"airflow_test_dag_id_1_3_test_job_id_2020_01_23T00_00_00_hash_5", 5),
+        ],
+        ids=["test-dag-id-1.1", "test-dag-id-1.2", "test-dag-id-1.3"],
+    )
+    def test_job_id_validity_with_try_number(self, mock_md5, test_dag_id, 
expected_job_id, try_number):
+        hash_ = "hash"
+        mock_md5.return_value.hexdigest.return_value = hash_
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
+        }
+
+        job_id = self.hook.generate_job_id(
+            job_id=None,
+            dag_id=test_dag_id,
+            task_id="test_job_id",
+            logical_date=None,
+            configuration=configuration,
+            run_after=datetime(2020, 1, 23),
+            try_number=try_number,
+        )
+        assert job_id == expected_job_id
+
     def test_get_run_after_or_logical_date(self):
         """Test get_run_after_or_logical_date for both Airflow 3.x and pre-3.0 
behavior."""
         if AIRFLOW_V_3_0_PLUS:
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
index c1eed574ad0..2655132009b 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
@@ -1592,8 +1592,127 @@ class TestBigQueryInsertJobOperator:
             run_after=ANY,
             configuration=configuration,
             force_rerun=True,
+            try_number=None,
         )
 
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute_generate_job_id_with_ti_try_number(self, mock_hook):
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
+        }
+
+        # Setup context with try_number > 2
+        context = MagicMock()
+        context["ti"].try_number = 3
+
+        mock_hook.return_value.get_run_after_or_logical_date.return_value = 
None
+        mock_hook.return_value.generate_job_id.return_value = 
"generated_job_id_with_try_number"
+        mock_hook.return_value.insert_job.return_value = MagicMock(
+            state="DONE", job_id="generated_job_id_with_try_number", 
error_result=False
+        )
+
+        op = BigQueryInsertJobOperator(
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=None,  # job_id must be None to trigger the try_number logic
+            project_id=TEST_GCP_PROJECT_ID,
+            force_rerun=False,
+        )
+
+        op.execute(context=context)
+
+        mock_hook.return_value.generate_job_id.assert_called_once_with(
+            job_id=None,
+            dag_id=op.dag_id,
+            task_id=op.task_id,
+            logical_date=None,
+            configuration=configuration,
+            run_after=None,
+            force_rerun=False,
+            try_number=2,
+        )
+
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute_conflict_429_retry_resubmits_job(self, mock_hook):
+        """
+        Tests that if a Conflict occurs and the retrieved job failed with a 
429 error,
+        a new job_id is generated and the job is resubmitted.
+        """
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
+        }
+
+        # Setup context with try_number > 1
+        context = MagicMock()
+        context["ti"].try_number = 2
+
+        mock_hook.return_value.get_run_after_or_logical_date.return_value = 
None
+
+        # We need generate_job_id to return two different IDs for the two 
attempts
+        first_job_id = "initial_generated_job_id"
+        second_job_id = "retry_generated_job_id"
+        mock_hook.return_value.generate_job_id.side_effect = [first_job_id, 
second_job_id]
+
+        # First insert_job raises Conflict, second one succeeds
+        success_job = MagicMock(state="DONE", job_id=second_job_id, 
error_result=False)
+        mock_hook.return_value.insert_job.side_effect = [Conflict("already 
exists"), success_job]
+
+        # get_job returns a job in DONE state with a 429 error
+        failed_429_job = MagicMock(
+            job_id=first_job_id,
+            state="DONE",
+            error_result="Quota exceeded: Your project exceeded quota for... 
HTTP 429 Too Many Requests",
+        )
+        mock_hook.return_value.get_job.return_value = failed_429_job
+
+        op = BigQueryInsertJobOperator(
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=None,
+            project_id=TEST_GCP_PROJECT_ID,
+            force_rerun=False,
+        )
+
+        result = op.execute(context=context)
+
+        # 1. Assert generate_job_id was called twice (initial + retry)
+        assert mock_hook.return_value.generate_job_id.call_count == 2
+
+        # Check the arguments of the second generate_job_id call (the retry)
+        mock_hook.return_value.generate_job_id.assert_called_with(
+            job_id=None,
+            dag_id=op.dag_id,
+            task_id=op.task_id,
+            logical_date=None,
+            configuration=configuration,
+            run_after=None,
+            force_rerun=False,
+            try_number=2,  # Should pass context["ti"].try_number exactly
+        )
+
+        # 2. Assert insert_job was called twice (initial submit + resubmit)
+        assert mock_hook.return_value.insert_job.call_count == 2
+        mock_hook.return_value.insert_job.assert_called_with(
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=second_job_id,  # Ensure the second call used the newly 
generated job_id
+            nowait=True,
+            project_id=TEST_GCP_PROJECT_ID,
+            retry=DEFAULT_RETRY,
+            timeout=None,
+        )
+
+        # 3. Assert the operator returns the final successful job_id
+        assert result == second_job_id
+
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute_openlineage_events(self, mock_hook):
         job_id = "123456"

Reply via email to