This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 508d7fd4d3a (system tests): Move BigQuery streaming-buffer system test
to manual-only (#67009)
508d7fd4d3a is described below
commit 508d7fd4d3af06dd84d620e4001ec6adb1c7fd0f
Author: Shahar Epstein <[email protected]>
AuthorDate: Fri May 15 20:33:34 2026 +0300
(system tests): Move BigQuery streaming-buffer system test to manual-only
(#67009)
The streaming-buffer test path in example_bigquery_sensors.py can run for
up to ~90 minutes against real BigQuery (the documented streaming-buffer
flush window), which made the bigquery_sensors system test too slow for
the Google CI maintainers to run on every cycle.
Revert example_bigquery_sensors.py to its pre-#66652 form and extract the
streaming-buffer flow into a new manual-only system test
(example_bigquery_streaming_buffer_sensor.py), gated behind
RUN_MANUAL_GOOGLE_SYSTEM_TESTS — the same flag used by
example_dataproc_cancel_on_kill.py. Documentation howto-include
references for BigQueryStreamingBufferEmptySensor are repointed at the
new file.
related: #66652, #66962
---
providers/google/docs/operators/cloud/bigquery.rst | 4 +-
.../cloud/bigquery/example_bigquery_sensors.py | 85 +------------
...=> example_bigquery_streaming_buffer_sensor.py} | 139 ++++++---------------
3 files changed, 38 insertions(+), 190 deletions(-)
diff --git a/providers/google/docs/operators/cloud/bigquery.rst
b/providers/google/docs/operators/cloud/bigquery.rst
index 2008e7d03d0..db7c50b6012 100644
--- a/providers/google/docs/operators/cloud/bigquery.rst
+++ b/providers/google/docs/operators/cloud/bigquery.rst
@@ -534,7 +534,7 @@ To check that the BigQuery streaming buffer of a table is
empty you can use
This sensor is useful in ETL pipelines to ensure that recent streamed data has
been fully
processed before continuing downstream tasks.
-.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_bigquery_streaming_buffer_empty]
@@ -542,7 +542,7 @@ processed before continuing downstream tasks.
Also you can use deferrable mode in this operator if you would like to free up
the worker slots while the sensor is running.
-.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+.. exampleinclude::
/../../google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_bigquery_streaming_buffer_empty_deferred]
diff --git
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
index cb9f218b1c2..59d5b210715 100644
---
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+++
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
@@ -22,11 +22,9 @@ Example Airflow DAG for Google BigQuery Sensors.
from __future__ import annotations
import os
-import time
from datetime import datetime
from airflow.models.dag import DAG
-from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateTableOperator,
@@ -34,16 +32,14 @@ from airflow.providers.google.cloud.operators.bigquery
import (
BigQueryInsertJobOperator,
)
from airflow.providers.google.cloud.sensors.bigquery import (
- BigQueryStreamingBufferEmptySensor,
BigQueryTableExistenceSensor,
BigQueryTablePartitionExistenceSensor,
)
try:
- from airflow.sdk import TriggerRule, task
+ from airflow.sdk import TriggerRule
except ImportError:
# Compatibility for Airflow < 3.1
- from airflow.decorators import task # type: ignore[no-redef,attr-defined]
from airflow.utils.trigger_rule import TriggerRule # type:
ignore[no-redef,attr-defined]
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
@@ -58,11 +54,6 @@ PARTITION_NAME = "{{ ds_nodash }}"
INSERT_ROWS_QUERY = f"INSERT {DATASET_NAME}.{TABLE_NAME} VALUES (42, '{{{{ ds
}}}}')"
-# DML on rows still in the streaming buffer is rejected by BigQuery, hence the
-# sensor in the streaming-insert -> sensor -> DML chain below.
-STREAMING_UPDATE_QUERY = f"UPDATE {DATASET_NAME}.{TABLE_NAME} SET value = 200
WHERE value = 100"
-STREAMING_DELETE_QUERY = f"DELETE FROM {DATASET_NAME}.{TABLE_NAME} WHERE value
= 200"
-
SCHEMA = [
{"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "ds", "type": "DATE", "mode": "NULLABLE"},
@@ -161,75 +152,6 @@ with DAG(
)
# [END howto_sensor_bigquery_table_partition_async]
- # [START howto_sensor_bigquery_streaming_buffer_empty]
- check_streaming_buffer_empty = BigQueryStreamingBufferEmptySensor(
- task_id="check_streaming_buffer_empty",
- project_id=PROJECT_ID,
- dataset_id=DATASET_NAME,
- table_id=TABLE_NAME,
- poke_interval=30,
- timeout=5400, # BigQuery flushes the streaming buffer within ~90
minutes
- )
- # [END howto_sensor_bigquery_streaming_buffer_empty]
-
- @task(task_id="streaming_insert")
- def streaming_insert(ds: str | None = None) -> None:
- hook = BigQueryHook()
- hook.insert_all(
- project_id=PROJECT_ID,
- dataset_id=DATASET_NAME,
- table_id=TABLE_NAME,
- rows=[{"value": 100, "ds": ds}],
- fail_on_error=True,
- )
- # BigQuery's streamingBuffer table metadata is eventually consistent:
for
- # a few seconds after a streaming insert the row is in the buffer but
- # table.streaming_buffer is still None. Wait for the metadata to catch
up
- # so check_streaming_buffer_empty does not falsely report "empty"
before
- # the buffer is reported at all. Remove once the sensor handles this
- # itself; tracked at https://github.com/apache/airflow/issues/66963
- client = hook.get_client(project_id=PROJECT_ID)
- table_uri = f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"
- for _ in range(30):
- if client.get_table(table_uri).streaming_buffer is not None:
- return
- time.sleep(2)
- raise RuntimeError("BigQuery streaming buffer metadata did not appear
within 60s")
-
- streaming_insert_task = streaming_insert()
-
- stream_update = BigQueryInsertJobOperator(
- task_id="stream_update",
- configuration={
- "query": {
- "query": STREAMING_UPDATE_QUERY,
- "useLegacySql": False,
- }
- },
- )
-
- stream_delete = BigQueryInsertJobOperator(
- task_id="stream_delete",
- configuration={
- "query": {
- "query": STREAMING_DELETE_QUERY,
- "useLegacySql": False,
- }
- },
- )
-
- # [START howto_sensor_bigquery_streaming_buffer_empty_deferred]
- check_streaming_buffer_empty_def = BigQueryStreamingBufferEmptySensor(
- task_id="check_streaming_buffer_empty_def",
- project_id=PROJECT_ID,
- dataset_id=DATASET_NAME,
- table_id=TABLE_NAME,
- deferrable=True,
- poke_interval=30,
- timeout=5400, # BigQuery flushes the streaming buffer within ~90
minutes
- )
- # [END howto_sensor_bigquery_streaming_buffer_empty_deferred]
-
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset",
dataset_id=DATASET_NAME,
@@ -247,11 +169,6 @@ with DAG(
check_table_partition_exists_async,
check_table_partition_exists_def,
]
- >> streaming_insert_task
- >> check_streaming_buffer_empty
- >> stream_update
- >> check_streaming_buffer_empty_def
- >> stream_delete
>> delete_dataset
)
diff --git
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py
similarity index 69%
copy from
providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
copy to
providers/google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py
index cb9f218b1c2..68035ac944e 100644
---
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+++
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py
@@ -16,7 +16,11 @@
# specific language governing permissions and limitations
# under the License.
"""
-Example Airflow DAG for Google BigQuery Sensors.
+Example Airflow DAG exercising ``BigQueryStreamingBufferEmptySensor``.
+
+BigQuery's streaming buffer can take up to ~90 minutes to flush, so this
+test can run for a long time end-to-end and is therefore opt-in: set
+``RUN_MANUAL_GOOGLE_SYSTEM_TESTS=1`` to run it.
"""
from __future__ import annotations
@@ -25,6 +29,8 @@ import os
import time
from datetime import datetime
+import pytest
+
from airflow.models.dag import DAG
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.operators.bigquery import (
@@ -35,8 +41,6 @@ from airflow.providers.google.cloud.operators.bigquery import
(
)
from airflow.providers.google.cloud.sensors.bigquery import (
BigQueryStreamingBufferEmptySensor,
- BigQueryTableExistenceSensor,
- BigQueryTablePartitionExistenceSensor,
)
try:
@@ -46,18 +50,18 @@ except ImportError:
from airflow.decorators import task # type: ignore[no-redef,attr-defined]
from airflow.utils.trigger_rule import TriggerRule # type:
ignore[no-redef,attr-defined]
+pytestmark = pytest.mark.skipif(
+ not os.environ.get("RUN_MANUAL_GOOGLE_SYSTEM_TESTS"),
+ reason="Manual-only system test: set RUN_MANUAL_GOOGLE_SYSTEM_TESTS=1 to
run.",
+)
+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "bigquery_sensors"
+DAG_ID = "bigquery_streaming_buffer_sensor"
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}".replace("-", "_")
TABLE_NAME = f"partitioned_table_{DAG_ID}_{ENV_ID}".replace("-", "_")
-INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
-PARTITION_NAME = "{{ ds_nodash }}"
-
-INSERT_ROWS_QUERY = f"INSERT {DATASET_NAME}.{TABLE_NAME} VALUES (42, '{{{{ ds
}}}}')"
-
# DML on rows still in the streaming buffer is rejected by BigQuery, hence the
# sensor in the streaming-insert -> sensor -> DML chain below.
STREAMING_UPDATE_QUERY = f"UPDATE {DATASET_NAME}.{TABLE_NAME} SET value = 200
WHERE value = 100"
@@ -74,7 +78,7 @@ with DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "bigquery", "sensors"],
+ tags=["example", "bigquery", "sensors", "manual"],
user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME},
default_args={"project_id": PROJECT_ID},
) as dag:
@@ -95,83 +99,6 @@ with DAG(
},
)
- # [START howto_sensor_bigquery_table]
- check_table_exists = BigQueryTableExistenceSensor(
- task_id="check_table_exists", project_id=PROJECT_ID,
dataset_id=DATASET_NAME, table_id=TABLE_NAME
- )
- # [END howto_sensor_bigquery_table]
-
- # [START howto_sensor_bigquery_table_defered]
- check_table_exists_def = BigQueryTableExistenceSensor(
- task_id="check_table_exists_def",
- project_id=PROJECT_ID,
- dataset_id=DATASET_NAME,
- table_id=TABLE_NAME,
- deferrable=True,
- )
- # [END howto_sensor_bigquery_table_defered]
-
- # [START howto_sensor_async_bigquery_table]
- check_table_exists_async = BigQueryTableExistenceSensor(
- task_id="check_table_exists_async",
- project_id=PROJECT_ID,
- dataset_id=DATASET_NAME,
- table_id=TABLE_NAME,
- )
- # [END howto_sensor_async_bigquery_table]
-
- execute_insert_query = BigQueryInsertJobOperator(
- task_id="execute_insert_query",
- configuration={
- "query": {
- "query": INSERT_ROWS_QUERY,
- "useLegacySql": False,
- }
- },
- )
-
- # [START howto_sensor_bigquery_table_partition]
- check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
- task_id="check_table_partition_exists",
- project_id=PROJECT_ID,
- dataset_id=DATASET_NAME,
- table_id=TABLE_NAME,
- partition_id=PARTITION_NAME,
- )
- # [END howto_sensor_bigquery_table_partition]
-
- # [START howto_sensor_bigquery_table_partition_defered]
- check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
- task_id="check_table_partition_exists_def",
- project_id=PROJECT_ID,
- dataset_id=DATASET_NAME,
- table_id=TABLE_NAME,
- partition_id=PARTITION_NAME,
- deferrable=True,
- )
- # [END howto_sensor_bigquery_table_partition_defered]
-
- # [START howto_sensor_bigquery_table_partition_async]
- check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
- task_id="check_table_partition_exists_async",
- partition_id=PARTITION_NAME,
- project_id=PROJECT_ID,
- dataset_id=DATASET_NAME,
- table_id=TABLE_NAME,
- )
- # [END howto_sensor_bigquery_table_partition_async]
-
- # [START howto_sensor_bigquery_streaming_buffer_empty]
- check_streaming_buffer_empty = BigQueryStreamingBufferEmptySensor(
- task_id="check_streaming_buffer_empty",
- project_id=PROJECT_ID,
- dataset_id=DATASET_NAME,
- table_id=TABLE_NAME,
- poke_interval=30,
- timeout=5400, # BigQuery flushes the streaming buffer within ~90
minutes
- )
- # [END howto_sensor_bigquery_streaming_buffer_empty]
-
@task(task_id="streaming_insert")
def streaming_insert(ds: str | None = None) -> None:
hook = BigQueryHook()
@@ -198,6 +125,17 @@ with DAG(
streaming_insert_task = streaming_insert()
+ # [START howto_sensor_bigquery_streaming_buffer_empty]
+ check_streaming_buffer_empty = BigQueryStreamingBufferEmptySensor(
+ task_id="check_streaming_buffer_empty",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ table_id=TABLE_NAME,
+ poke_interval=30,
+ timeout=5400, # BigQuery flushes the streaming buffer within ~90
minutes
+ )
+ # [END howto_sensor_bigquery_streaming_buffer_empty]
+
stream_update = BigQueryInsertJobOperator(
task_id="stream_update",
configuration={
@@ -208,16 +146,6 @@ with DAG(
},
)
- stream_delete = BigQueryInsertJobOperator(
- task_id="stream_delete",
- configuration={
- "query": {
- "query": STREAMING_DELETE_QUERY,
- "useLegacySql": False,
- }
- },
- )
-
# [START howto_sensor_bigquery_streaming_buffer_empty_deferred]
check_streaming_buffer_empty_def = BigQueryStreamingBufferEmptySensor(
task_id="check_streaming_buffer_empty_def",
@@ -230,6 +158,16 @@ with DAG(
)
# [END howto_sensor_bigquery_streaming_buffer_empty_deferred]
+ stream_delete = BigQueryInsertJobOperator(
+ task_id="stream_delete",
+ configuration={
+ "query": {
+ "query": STREAMING_DELETE_QUERY,
+ "useLegacySql": False,
+ }
+ },
+ )
+
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset",
dataset_id=DATASET_NAME,
@@ -240,13 +178,6 @@ with DAG(
(
create_dataset
>> create_table
- >> [check_table_exists, check_table_exists_async,
check_table_exists_def]
- >> execute_insert_query
- >> [
- check_table_partition_exists,
- check_table_partition_exists_async,
- check_table_partition_exists_def,
- ]
>> streaming_insert_task
>> check_streaming_buffer_empty
>> stream_update