This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 508d7fd4d3a (system tests): Move BigQuery streaming-buffer system test 
to manual-only (#67009)
508d7fd4d3a is described below

commit 508d7fd4d3af06dd84d620e4001ec6adb1c7fd0f
Author: Shahar Epstein <[email protected]>
AuthorDate: Fri May 15 20:33:34 2026 +0300

    (system tests): Move BigQuery streaming-buffer system test to manual-only 
(#67009)
    
    The streaming-buffer test path in example_bigquery_sensors.py can run for
    up to ~90 minutes against real BigQuery (the documented streaming-buffer
    flush window), which made the bigquery_sensors system test too slow for
    the Google CI maintainers to run on every cycle.
    
    Revert example_bigquery_sensors.py to its pre-#66652 form and extract the
    streaming-buffer flow into a new manual-only system test
    (example_bigquery_streaming_buffer_sensor.py), gated behind
    RUN_MANUAL_GOOGLE_SYSTEM_TESTS — the same flag used by
    example_dataproc_cancel_on_kill.py. Documentation howto-include
    references for BigQueryStreamingBufferEmptySensor are repointed at the
    new file.
    
    related: #66652, #66962
---
 providers/google/docs/operators/cloud/bigquery.rst |   4 +-
 .../cloud/bigquery/example_bigquery_sensors.py     |  85 +------------
 ...=> example_bigquery_streaming_buffer_sensor.py} | 139 ++++++---------------
 3 files changed, 38 insertions(+), 190 deletions(-)

diff --git a/providers/google/docs/operators/cloud/bigquery.rst 
b/providers/google/docs/operators/cloud/bigquery.rst
index 2008e7d03d0..db7c50b6012 100644
--- a/providers/google/docs/operators/cloud/bigquery.rst
+++ b/providers/google/docs/operators/cloud/bigquery.rst
@@ -534,7 +534,7 @@ To check that the BigQuery streaming buffer of a table is 
empty you can use
 This sensor is useful in ETL pipelines to ensure that recent streamed data has 
been fully
 processed before continuing downstream tasks.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_bigquery_streaming_buffer_empty]
@@ -542,7 +542,7 @@ processed before continuing downstream tasks.
 
 Also you can use deferrable mode in this operator if you would like to free up 
the worker slots while the sensor is running.
 
-.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_bigquery_streaming_buffer_empty_deferred]
diff --git 
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
 
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
index cb9f218b1c2..59d5b210715 100644
--- 
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+++ 
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
@@ -22,11 +22,9 @@ Example Airflow DAG for Google BigQuery Sensors.
 from __future__ import annotations
 
 import os
-import time
 from datetime import datetime
 
 from airflow.models.dag import DAG
-from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
 from airflow.providers.google.cloud.operators.bigquery import (
     BigQueryCreateEmptyDatasetOperator,
     BigQueryCreateTableOperator,
@@ -34,16 +32,14 @@ from airflow.providers.google.cloud.operators.bigquery 
import (
     BigQueryInsertJobOperator,
 )
 from airflow.providers.google.cloud.sensors.bigquery import (
-    BigQueryStreamingBufferEmptySensor,
     BigQueryTableExistenceSensor,
     BigQueryTablePartitionExistenceSensor,
 )
 
 try:
-    from airflow.sdk import TriggerRule, task
+    from airflow.sdk import TriggerRule
 except ImportError:
     # Compatibility for Airflow < 3.1
-    from airflow.decorators import task  # type: ignore[no-redef,attr-defined]
     from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
@@ -58,11 +54,6 @@ PARTITION_NAME = "{{ ds_nodash }}"
 
 INSERT_ROWS_QUERY = f"INSERT {DATASET_NAME}.{TABLE_NAME} VALUES (42, '{{{{ ds 
}}}}')"
 
-# DML on rows still in the streaming buffer is rejected by BigQuery, hence the
-# sensor in the streaming-insert -> sensor -> DML chain below.
-STREAMING_UPDATE_QUERY = f"UPDATE {DATASET_NAME}.{TABLE_NAME} SET value = 200 
WHERE value = 100"
-STREAMING_DELETE_QUERY = f"DELETE FROM {DATASET_NAME}.{TABLE_NAME} WHERE value 
= 200"
-
 SCHEMA = [
     {"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
     {"name": "ds", "type": "DATE", "mode": "NULLABLE"},
@@ -161,75 +152,6 @@ with DAG(
     )
     # [END howto_sensor_bigquery_table_partition_async]
 
-    # [START howto_sensor_bigquery_streaming_buffer_empty]
-    check_streaming_buffer_empty = BigQueryStreamingBufferEmptySensor(
-        task_id="check_streaming_buffer_empty",
-        project_id=PROJECT_ID,
-        dataset_id=DATASET_NAME,
-        table_id=TABLE_NAME,
-        poke_interval=30,
-        timeout=5400,  # BigQuery flushes the streaming buffer within ~90 
minutes
-    )
-    # [END howto_sensor_bigquery_streaming_buffer_empty]
-
-    @task(task_id="streaming_insert")
-    def streaming_insert(ds: str | None = None) -> None:
-        hook = BigQueryHook()
-        hook.insert_all(
-            project_id=PROJECT_ID,
-            dataset_id=DATASET_NAME,
-            table_id=TABLE_NAME,
-            rows=[{"value": 100, "ds": ds}],
-            fail_on_error=True,
-        )
-        # BigQuery's streamingBuffer table metadata is eventually consistent: 
for
-        # a few seconds after a streaming insert the row is in the buffer but
-        # table.streaming_buffer is still None. Wait for the metadata to catch 
up
-        # so check_streaming_buffer_empty does not falsely report "empty" 
before
-        # the buffer is reported at all. Remove once the sensor handles this
-        # itself; tracked at https://github.com/apache/airflow/issues/66963
-        client = hook.get_client(project_id=PROJECT_ID)
-        table_uri = f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"
-        for _ in range(30):
-            if client.get_table(table_uri).streaming_buffer is not None:
-                return
-            time.sleep(2)
-        raise RuntimeError("BigQuery streaming buffer metadata did not appear 
within 60s")
-
-    streaming_insert_task = streaming_insert()
-
-    stream_update = BigQueryInsertJobOperator(
-        task_id="stream_update",
-        configuration={
-            "query": {
-                "query": STREAMING_UPDATE_QUERY,
-                "useLegacySql": False,
-            }
-        },
-    )
-
-    stream_delete = BigQueryInsertJobOperator(
-        task_id="stream_delete",
-        configuration={
-            "query": {
-                "query": STREAMING_DELETE_QUERY,
-                "useLegacySql": False,
-            }
-        },
-    )
-
-    # [START howto_sensor_bigquery_streaming_buffer_empty_deferred]
-    check_streaming_buffer_empty_def = BigQueryStreamingBufferEmptySensor(
-        task_id="check_streaming_buffer_empty_def",
-        project_id=PROJECT_ID,
-        dataset_id=DATASET_NAME,
-        table_id=TABLE_NAME,
-        deferrable=True,
-        poke_interval=30,
-        timeout=5400,  # BigQuery flushes the streaming buffer within ~90 
minutes
-    )
-    # [END howto_sensor_bigquery_streaming_buffer_empty_deferred]
-
     delete_dataset = BigQueryDeleteDatasetOperator(
         task_id="delete_dataset",
         dataset_id=DATASET_NAME,
@@ -247,11 +169,6 @@ with DAG(
             check_table_partition_exists_async,
             check_table_partition_exists_def,
         ]
-        >> streaming_insert_task
-        >> check_streaming_buffer_empty
-        >> stream_update
-        >> check_streaming_buffer_empty_def
-        >> stream_delete
         >> delete_dataset
     )
 
diff --git 
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
 
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py
similarity index 69%
copy from 
providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
copy to 
providers/google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py
index cb9f218b1c2..68035ac944e 100644
--- 
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
+++ 
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py
@@ -16,7 +16,11 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG for Google BigQuery Sensors.
+Example Airflow DAG exercising ``BigQueryStreamingBufferEmptySensor``.
+
+BigQuery's streaming buffer can take up to ~90 minutes to flush, so this
+test can run for a long time end-to-end and is therefore opt-in: set
+``RUN_MANUAL_GOOGLE_SYSTEM_TESTS=1`` to run it.
 """
 
 from __future__ import annotations
@@ -25,6 +29,8 @@ import os
 import time
 from datetime import datetime
 
+import pytest
+
 from airflow.models.dag import DAG
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
 from airflow.providers.google.cloud.operators.bigquery import (
@@ -35,8 +41,6 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
 )
 from airflow.providers.google.cloud.sensors.bigquery import (
     BigQueryStreamingBufferEmptySensor,
-    BigQueryTableExistenceSensor,
-    BigQueryTablePartitionExistenceSensor,
 )
 
 try:
@@ -46,18 +50,18 @@ except ImportError:
     from airflow.decorators import task  # type: ignore[no-redef,attr-defined]
     from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
 
+pytestmark = pytest.mark.skipif(
+    not os.environ.get("RUN_MANUAL_GOOGLE_SYSTEM_TESTS"),
+    reason="Manual-only system test: set RUN_MANUAL_GOOGLE_SYSTEM_TESTS=1 to 
run.",
+)
+
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "bigquery_sensors"
+DAG_ID = "bigquery_streaming_buffer_sensor"
 
 DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}".replace("-", "_")
 TABLE_NAME = f"partitioned_table_{DAG_ID}_{ENV_ID}".replace("-", "_")
 
-INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
-PARTITION_NAME = "{{ ds_nodash }}"
-
-INSERT_ROWS_QUERY = f"INSERT {DATASET_NAME}.{TABLE_NAME} VALUES (42, '{{{{ ds 
}}}}')"
-
 # DML on rows still in the streaming buffer is rejected by BigQuery, hence the
 # sensor in the streaming-insert -> sensor -> DML chain below.
 STREAMING_UPDATE_QUERY = f"UPDATE {DATASET_NAME}.{TABLE_NAME} SET value = 200 
WHERE value = 100"
@@ -74,7 +78,7 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "bigquery", "sensors"],
+    tags=["example", "bigquery", "sensors", "manual"],
     user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME},
     default_args={"project_id": PROJECT_ID},
 ) as dag:
@@ -95,83 +99,6 @@ with DAG(
         },
     )
 
-    # [START howto_sensor_bigquery_table]
-    check_table_exists = BigQueryTableExistenceSensor(
-        task_id="check_table_exists", project_id=PROJECT_ID, 
dataset_id=DATASET_NAME, table_id=TABLE_NAME
-    )
-    # [END howto_sensor_bigquery_table]
-
-    # [START howto_sensor_bigquery_table_defered]
-    check_table_exists_def = BigQueryTableExistenceSensor(
-        task_id="check_table_exists_def",
-        project_id=PROJECT_ID,
-        dataset_id=DATASET_NAME,
-        table_id=TABLE_NAME,
-        deferrable=True,
-    )
-    # [END howto_sensor_bigquery_table_defered]
-
-    # [START howto_sensor_async_bigquery_table]
-    check_table_exists_async = BigQueryTableExistenceSensor(
-        task_id="check_table_exists_async",
-        project_id=PROJECT_ID,
-        dataset_id=DATASET_NAME,
-        table_id=TABLE_NAME,
-    )
-    # [END howto_sensor_async_bigquery_table]
-
-    execute_insert_query = BigQueryInsertJobOperator(
-        task_id="execute_insert_query",
-        configuration={
-            "query": {
-                "query": INSERT_ROWS_QUERY,
-                "useLegacySql": False,
-            }
-        },
-    )
-
-    # [START howto_sensor_bigquery_table_partition]
-    check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
-        task_id="check_table_partition_exists",
-        project_id=PROJECT_ID,
-        dataset_id=DATASET_NAME,
-        table_id=TABLE_NAME,
-        partition_id=PARTITION_NAME,
-    )
-    # [END howto_sensor_bigquery_table_partition]
-
-    # [START howto_sensor_bigquery_table_partition_defered]
-    check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
-        task_id="check_table_partition_exists_def",
-        project_id=PROJECT_ID,
-        dataset_id=DATASET_NAME,
-        table_id=TABLE_NAME,
-        partition_id=PARTITION_NAME,
-        deferrable=True,
-    )
-    # [END howto_sensor_bigquery_table_partition_defered]
-
-    # [START howto_sensor_bigquery_table_partition_async]
-    check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
-        task_id="check_table_partition_exists_async",
-        partition_id=PARTITION_NAME,
-        project_id=PROJECT_ID,
-        dataset_id=DATASET_NAME,
-        table_id=TABLE_NAME,
-    )
-    # [END howto_sensor_bigquery_table_partition_async]
-
-    # [START howto_sensor_bigquery_streaming_buffer_empty]
-    check_streaming_buffer_empty = BigQueryStreamingBufferEmptySensor(
-        task_id="check_streaming_buffer_empty",
-        project_id=PROJECT_ID,
-        dataset_id=DATASET_NAME,
-        table_id=TABLE_NAME,
-        poke_interval=30,
-        timeout=5400,  # BigQuery flushes the streaming buffer within ~90 
minutes
-    )
-    # [END howto_sensor_bigquery_streaming_buffer_empty]
-
     @task(task_id="streaming_insert")
     def streaming_insert(ds: str | None = None) -> None:
         hook = BigQueryHook()
@@ -198,6 +125,17 @@ with DAG(
 
     streaming_insert_task = streaming_insert()
 
+    # [START howto_sensor_bigquery_streaming_buffer_empty]
+    check_streaming_buffer_empty = BigQueryStreamingBufferEmptySensor(
+        task_id="check_streaming_buffer_empty",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        table_id=TABLE_NAME,
+        poke_interval=30,
+        timeout=5400,  # BigQuery flushes the streaming buffer within ~90 
minutes
+    )
+    # [END howto_sensor_bigquery_streaming_buffer_empty]
+
     stream_update = BigQueryInsertJobOperator(
         task_id="stream_update",
         configuration={
@@ -208,16 +146,6 @@ with DAG(
         },
     )
 
-    stream_delete = BigQueryInsertJobOperator(
-        task_id="stream_delete",
-        configuration={
-            "query": {
-                "query": STREAMING_DELETE_QUERY,
-                "useLegacySql": False,
-            }
-        },
-    )
-
     # [START howto_sensor_bigquery_streaming_buffer_empty_deferred]
     check_streaming_buffer_empty_def = BigQueryStreamingBufferEmptySensor(
         task_id="check_streaming_buffer_empty_def",
@@ -230,6 +158,16 @@ with DAG(
     )
     # [END howto_sensor_bigquery_streaming_buffer_empty_deferred]
 
+    stream_delete = BigQueryInsertJobOperator(
+        task_id="stream_delete",
+        configuration={
+            "query": {
+                "query": STREAMING_DELETE_QUERY,
+                "useLegacySql": False,
+            }
+        },
+    )
+
     delete_dataset = BigQueryDeleteDatasetOperator(
         task_id="delete_dataset",
         dataset_id=DATASET_NAME,
@@ -240,13 +178,6 @@ with DAG(
     (
         create_dataset
         >> create_table
-        >> [check_table_exists, check_table_exists_async, 
check_table_exists_def]
-        >> execute_insert_query
-        >> [
-            check_table_partition_exists,
-            check_table_partition_exists_async,
-            check_table_partition_exists_def,
-        ]
         >> streaming_insert_task
         >> check_streaming_buffer_empty
         >> stream_update

Reply via email to