This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d74039c0751 fix `BigQueryStreamingBufferEmptySensor.poke()` always 
raise `NotFound` (#66962)
d74039c0751 is described below

commit d74039c075138a43bde41663d8f82690abd53a73
Author: Shahar Epstein <[email protected]>
AuthorDate: Fri May 15 07:09:58 2026 +0300

    fix `BigQueryStreamingBufferEmptySensor.poke()` always raise `NotFound` 
(#66962)
    
    * Fix BigQueryStreamingBufferEmptySensor failing against real BigQuery
    
    The sensor passed the legacy ``project:dataset.table`` string to
    ``Client.get_table``, which only accepts a ``TableReference`` or a
    standard-SQL ``project.dataset.table`` string. Against real BigQuery the
    poke always raised ``NotFound``, so the system test DAG could never get
    past ``check_streaming_buffer_empty``. Build an explicit
    ``TableReference`` instead, mirroring ``BigQueryHook.insert_all``.
    
    Co-Authored-By: Claude Opus 4.7 <[email protected]>
    
    * Fix streaming_insert in BigQuery sensors system test
    
    The streaming_insert task passed rows in the {"json": {...}} REST-API
    envelope to BigQueryHook.insert_all, which feeds them to the client
    library's insert_rows -- that expects plain row dicts, so the insert
    failed with "no such field: json" and no row was ever streamed. Pass
    plain row dicts instead, and set fail_on_error=True so a broken
    streaming insert fails the task loudly instead of silently no-ooping.
    
    The misleading {"json": {...}} format came straight from the insert_all
    docstring, so fix that (and its unit-test fixtures) too.
    
    Co-Authored-By: Claude Opus 4.7 <[email protected]>
    
    * Make BigQuery sensors system test resilient to streaming buffer metadata 
lag
    
    BigQuery's streamingBuffer table metadata is eventually consistent: for a
    few seconds after a streaming insert the row is in the buffer but
    table.streaming_buffer is still None, so check_streaming_buffer_empty can
    falsely pass before the buffer is reported at all. The streaming_insert
    task now waits for the metadata to appear before completing, making the
    system test deterministic.
    
    This is a workaround for a known sensor limitation, documented in the
    sensor docstring and tracked at
    https://github.com/apache/airflow/issues/66963
    
    Co-Authored-By: Claude Opus 4.7 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 <[email protected]>
---
 .../airflow/providers/google/cloud/hooks/bigquery.py |  5 +++--
 .../providers/google/cloud/sensors/bigquery.py       | 19 ++++++++++++++++++-
 .../cloud/bigquery/example_bigquery_sensors.py       | 20 ++++++++++++++++++--
 .../tests/unit/google/cloud/hooks/test_bigquery.py   |  6 +++---
 .../tests/unit/google/cloud/sensors/test_bigquery.py | 13 ++++++++++---
 5 files changed, 52 insertions(+), 11 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index 1aa72d81935..daf46361985 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -712,11 +712,12 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         :param project_id: The name of the project where we have the table
         :param dataset_id: The name of the dataset where we have the table
         :param table_id: The name of the table
-        :param rows: the rows to insert
+        :param rows: the rows to insert. Each row is a mapping of column name 
to
+            value, matching the table schema.
 
             .. code-block:: python
 
-                rows = [{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": 
"a_value_1"}}]
+                rows = [{"a_key": "a_value_0"}, {"a_key": "a_value_1"}]
 
         :param ignore_unknown_values: [Optional] Accept rows that contain 
values
             that do not match the schema. The unknown values are ignored.
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
index aa40186df60..5954d30abdb 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
@@ -25,6 +25,7 @@ from datetime import timedelta
 from typing import TYPE_CHECKING, Any
 
 from google.api_core.exceptions import NotFound
+from google.cloud.bigquery import DatasetReference, TableReference
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.common.compat.sdk import AirflowException, 
BaseSensorOperator, conf
@@ -332,6 +333,14 @@ class 
BigQueryStreamingBufferEmptySensor(BaseSensorOperator):
     ``UPDATE/MERGE/DELETE statement over table ... would affect rows in the
     streaming buffer`` errors.
 
+    .. warning::
+        The sensor reads ``table.streaming_buffer`` from BigQuery's table
+        metadata, which is eventually consistent. For a short window right
+        after a streaming insert the buffer metadata is still absent, so the
+        sensor may report the buffer empty before it actually is. Known
+        limitation tracked at
+        https://github.com/apache/airflow/issues/66963
+
     :param project_id: Google Cloud project containing the table.
     :param dataset_id: Dataset of the table to monitor.
     :param table_id: Table to monitor.
@@ -404,9 +413,17 @@ class 
BigQueryStreamingBufferEmptySensor(BaseSensorOperator):
         table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
         self.log.info("Checking streaming buffer state for table: %s", 
table_uri)
 
+        # ``Client.get_table`` only accepts a ``TableReference`` or a 
standard-SQL
+        # ``project.dataset.table`` string -- not the legacy 
``project:dataset.table``
+        # form -- so build an explicit reference here.
+        table_ref = TableReference(
+            dataset_ref=DatasetReference(self.project_id, self.dataset_id),
+            table_id=self.table_id,
+        )
+
         hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
         try:
-            table = 
hook.get_client(project_id=self.project_id).get_table(table_uri)
+            table = 
hook.get_client(project_id=self.project_id).get_table(table_ref)
         except NotFound as err:
             raise ValueError(f"Table {table_uri} not found") from err
         return table.streaming_buffer is None
diff --git 
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
 
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
index 849b978d4b6..cb9f218b1c2 100644
--- 
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+++ 
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
@@ -22,6 +22,7 @@ Example Airflow DAG for Google BigQuery Sensors.
 from __future__ import annotations
 
 import os
+import time
 from datetime import datetime
 
 from airflow.models.dag import DAG
@@ -173,12 +174,27 @@ with DAG(
 
     @task(task_id="streaming_insert")
     def streaming_insert(ds: str | None = None) -> None:
-        BigQueryHook().insert_all(
+        hook = BigQueryHook()
+        hook.insert_all(
             project_id=PROJECT_ID,
             dataset_id=DATASET_NAME,
             table_id=TABLE_NAME,
-            rows=[{"json": {"value": 100, "ds": ds}}],
+            rows=[{"value": 100, "ds": ds}],
+            fail_on_error=True,
         )
+        # BigQuery's streamingBuffer table metadata is eventually consistent: 
for
+        # a few seconds after a streaming insert the row is in the buffer but
+        # table.streaming_buffer is still None. Wait for the metadata to catch 
up
+        # so check_streaming_buffer_empty does not falsely report "empty" 
before
+        # the buffer is reported at all. Remove once the sensor handles this
+        # itself; tracked at https://github.com/apache/airflow/issues/66963
+        client = hook.get_client(project_id=PROJECT_ID)
+        table_uri = f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"
+        for _ in range(30):
+            if client.get_table(table_uri).streaming_buffer is not None:
+                return
+            time.sleep(2)
+        raise RuntimeError("BigQuery streaming buffer metadata did not appear 
within 60s")
 
     streaming_insert_task = streaming_insert()
 
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index ddda199849c..e750b42084c 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -600,7 +600,7 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
 
     @mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client")
     def test_insert_all_succeed(self, mock_client):
-        rows = [{"json": {"a_key": "a_value_0"}}]
+        rows = [{"a_key": "a_value_0"}]
 
         self.hook.insert_all(
             project_id=PROJECT_ID,
@@ -620,7 +620,7 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
 
     @mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client")
     def test_insert_all_fail(self, mock_client):
-        rows = [{"json": {"a_key": "a_value_0"}}]
+        rows = [{"a_key": "a_value_0"}]
 
         mock_client.return_value.insert_rows.return_value = ["some", "errors"]
         with pytest.raises(AirflowException, match="insert error"):
@@ -2204,7 +2204,7 @@ class TestHookLevelLineage(_BigQueryBaseTestClass):
             project_id=PROJECT_ID,
             dataset_id=DATASET_ID,
             table_id=TABLE_ID,
-            rows=[{"json": {"a_key": "a_value"}}],
+            rows=[{"a_key": "a_value"}],
         )
 
         assert len(hook_lineage_collector.collected_assets.inputs) == 0
diff --git a/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
index bba88a91025..1881c360e5e 100644
--- a/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
@@ -20,6 +20,7 @@ from unittest import mock
 
 import pytest
 from google.api_core.exceptions import NotFound
+from google.cloud.bigquery import TableReference
 
 from airflow.providers.common.compat.sdk import AirflowException, TaskDeferred
 from airflow.providers.google.cloud.sensors.bigquery import (
@@ -323,9 +324,15 @@ class TestBigQueryStreamingBufferEmptySensor:
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
         )
         
mock_hook.return_value.get_client.assert_called_once_with(project_id=TEST_PROJECT_ID)
-        
mock_hook.return_value.get_client.return_value.get_table.assert_called_once_with(
-            f"{TEST_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}"
-        )
+        get_table_call = 
mock_hook.return_value.get_client.return_value.get_table
+        get_table_call.assert_called_once()
+        # ``Client.get_table`` is given a ``TableReference``, never the legacy
+        # ``project:dataset.table`` string, which the BigQuery client rejects.
+        table_ref = get_table_call.call_args.args[0]
+        assert isinstance(table_ref, TableReference)
+        assert table_ref.project == TEST_PROJECT_ID
+        assert table_ref.dataset_id == TEST_DATASET_ID
+        assert table_ref.table_id == TEST_TABLE_ID
 
     @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
     def test_poke_returns_false_when_buffer_present(self, mock_hook):

Reply via email to