This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d74039c0751 fix `BigQueryStreamingBufferEmptySensor.poke()` always
raise `NotFound` (#66962)
d74039c0751 is described below
commit d74039c075138a43bde41663d8f82690abd53a73
Author: Shahar Epstein <[email protected]>
AuthorDate: Fri May 15 07:09:58 2026 +0300
fix `BigQueryStreamingBufferEmptySensor.poke()` always raise `NotFound`
(#66962)
* Fix BigQueryStreamingBufferEmptySensor failing against real BigQuery
The sensor passed the legacy ``project:dataset.table`` string to
``Client.get_table``, which only accepts a ``TableReference`` or a
standard-SQL ``project.dataset.table`` string. Against real BigQuery the
poke always raised ``NotFound``, so the system test DAG could never get
past ``check_streaming_buffer_empty``. Build an explicit
``TableReference`` instead, mirroring ``BigQueryHook.insert_all``.
Co-Authored-By: Claude Opus 4.7 <[email protected]>
* Fix streaming_insert in BigQuery sensors system test
The streaming_insert task passed rows in the {"json": {...}} REST-API
envelope to BigQueryHook.insert_all, which feeds them to the client
library's insert_rows -- that expects plain row dicts, so the insert
failed with "no such field: json" and no row was ever streamed. Pass
plain row dicts instead, and set fail_on_error=True so a broken
streaming insert fails the task loudly instead of silently no-ooping.
The misleading {"json": {...}} format came straight from the insert_all
docstring, so fix that (and its unit-test fixtures) too.
Co-Authored-By: Claude Opus 4.7 <[email protected]>
* Make BigQuery sensors system test resilient to streaming buffer metadata
lag
BigQuery's streamingBuffer table metadata is eventually consistent: for a
few seconds after a streaming insert the row is in the buffer but
table.streaming_buffer is still None, so check_streaming_buffer_empty can
falsely pass before the buffer is reported at all. The streaming_insert
task now waits for the metadata to appear before completing, making the
system test deterministic.
This is a workaround for a known sensor limitation, documented in the
sensor docstring and tracked at
https://github.com/apache/airflow/issues/66963
Co-Authored-By: Claude Opus 4.7 <[email protected]>
---------
Co-authored-by: Claude Opus 4.7 <[email protected]>
---
.../airflow/providers/google/cloud/hooks/bigquery.py | 5 +++--
.../providers/google/cloud/sensors/bigquery.py | 19 ++++++++++++++++++-
.../cloud/bigquery/example_bigquery_sensors.py | 20 ++++++++++++++++++--
.../tests/unit/google/cloud/hooks/test_bigquery.py | 6 +++---
.../tests/unit/google/cloud/sensors/test_bigquery.py | 13 ++++++++++---
5 files changed, 52 insertions(+), 11 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index 1aa72d81935..daf46361985 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -712,11 +712,12 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
:param project_id: The name of the project where we have the table
:param dataset_id: The name of the dataset where we have the table
:param table_id: The name of the table
- :param rows: the rows to insert
+ :param rows: the rows to insert. Each row is a mapping of column name
to
+ value, matching the table schema.
.. code-block:: python
- rows = [{"json": {"a_key": "a_value_0"}}, {"json": {"a_key":
"a_value_1"}}]
+ rows = [{"a_key": "a_value_0"}, {"a_key": "a_value_1"}]
:param ignore_unknown_values: [Optional] Accept rows that contain
values
that do not match the schema. The unknown values are ignored.
diff --git
a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
index aa40186df60..5954d30abdb 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
@@ -25,6 +25,7 @@ from datetime import timedelta
from typing import TYPE_CHECKING, Any
from google.api_core.exceptions import NotFound
+from google.cloud.bigquery import DatasetReference, TableReference
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.compat.sdk import AirflowException,
BaseSensorOperator, conf
@@ -332,6 +333,14 @@ class
BigQueryStreamingBufferEmptySensor(BaseSensorOperator):
``UPDATE/MERGE/DELETE statement over table ... would affect rows in the
streaming buffer`` errors.
+ .. warning::
+ The sensor reads ``table.streaming_buffer`` from BigQuery's table
+ metadata, which is eventually consistent. For a short window right
+ after a streaming insert the buffer metadata is still absent, so the
+ sensor may report the buffer empty before it actually is. Known
+ limitation tracked at
+ https://github.com/apache/airflow/issues/66963
+
:param project_id: Google Cloud project containing the table.
:param dataset_id: Dataset of the table to monitor.
:param table_id: Table to monitor.
@@ -404,9 +413,17 @@ class
BigQueryStreamingBufferEmptySensor(BaseSensorOperator):
table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
self.log.info("Checking streaming buffer state for table: %s",
table_uri)
+ # ``Client.get_table`` only accepts a ``TableReference`` or a
standard-SQL
+ # ``project.dataset.table`` string -- not the legacy
``project:dataset.table``
+ # form -- so build an explicit reference here.
+ table_ref = TableReference(
+ dataset_ref=DatasetReference(self.project_id, self.dataset_id),
+ table_id=self.table_id,
+ )
+
hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
try:
- table =
hook.get_client(project_id=self.project_id).get_table(table_uri)
+ table =
hook.get_client(project_id=self.project_id).get_table(table_ref)
except NotFound as err:
raise ValueError(f"Table {table_uri} not found") from err
return table.streaming_buffer is None
diff --git
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
index 849b978d4b6..cb9f218b1c2 100644
---
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+++
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
@@ -22,6 +22,7 @@ Example Airflow DAG for Google BigQuery Sensors.
from __future__ import annotations
import os
+import time
from datetime import datetime
from airflow.models.dag import DAG
@@ -173,12 +174,27 @@ with DAG(
@task(task_id="streaming_insert")
def streaming_insert(ds: str | None = None) -> None:
- BigQueryHook().insert_all(
+ hook = BigQueryHook()
+ hook.insert_all(
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
- rows=[{"json": {"value": 100, "ds": ds}}],
+ rows=[{"value": 100, "ds": ds}],
+ fail_on_error=True,
)
+ # BigQuery's streamingBuffer table metadata is eventually consistent:
for
+ # a few seconds after a streaming insert the row is in the buffer but
+ # table.streaming_buffer is still None. Wait for the metadata to catch
up
+ # so check_streaming_buffer_empty does not falsely report "empty"
before
+ # the buffer is reported at all. Remove once the sensor handles this
+ # itself; tracked at https://github.com/apache/airflow/issues/66963
+ client = hook.get_client(project_id=PROJECT_ID)
+ table_uri = f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"
+ for _ in range(30):
+ if client.get_table(table_uri).streaming_buffer is not None:
+ return
+ time.sleep(2)
+ raise RuntimeError("BigQuery streaming buffer metadata did not appear
within 60s")
streaming_insert_task = streaming_insert()
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index ddda199849c..e750b42084c 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -600,7 +600,7 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client")
def test_insert_all_succeed(self, mock_client):
- rows = [{"json": {"a_key": "a_value_0"}}]
+ rows = [{"a_key": "a_value_0"}]
self.hook.insert_all(
project_id=PROJECT_ID,
@@ -620,7 +620,7 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client")
def test_insert_all_fail(self, mock_client):
- rows = [{"json": {"a_key": "a_value_0"}}]
+ rows = [{"a_key": "a_value_0"}]
mock_client.return_value.insert_rows.return_value = ["some", "errors"]
with pytest.raises(AirflowException, match="insert error"):
@@ -2204,7 +2204,7 @@ class TestHookLevelLineage(_BigQueryBaseTestClass):
project_id=PROJECT_ID,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
- rows=[{"json": {"a_key": "a_value"}}],
+ rows=[{"a_key": "a_value"}],
)
assert len(hook_lineage_collector.collected_assets.inputs) == 0
diff --git a/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
b/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
index bba88a91025..1881c360e5e 100644
--- a/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
@@ -20,6 +20,7 @@ from unittest import mock
import pytest
from google.api_core.exceptions import NotFound
+from google.cloud.bigquery import TableReference
from airflow.providers.common.compat.sdk import AirflowException, TaskDeferred
from airflow.providers.google.cloud.sensors.bigquery import (
@@ -323,9 +324,15 @@ class TestBigQueryStreamingBufferEmptySensor:
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
mock_hook.return_value.get_client.assert_called_once_with(project_id=TEST_PROJECT_ID)
-
mock_hook.return_value.get_client.return_value.get_table.assert_called_once_with(
- f"{TEST_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}"
- )
+ get_table_call =
mock_hook.return_value.get_client.return_value.get_table
+ get_table_call.assert_called_once()
+ # ``Client.get_table`` is given a ``TableReference``, never the legacy
+ # ``project:dataset.table`` string, which the BigQuery client rejects.
+ table_ref = get_table_call.call_args.args[0]
+ assert isinstance(table_ref, TableReference)
+ assert table_ref.project == TEST_PROJECT_ID
+ assert table_ref.dataset_id == TEST_DATASET_ID
+ assert table_ref.table_id == TEST_TABLE_ID
@mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
def test_poke_returns_false_when_buffer_present(self, mock_hook):