MaksYermak commented on code in PR #61148:
URL: https://github.com/apache/airflow/pull/61148#discussion_r2815604019


##########
providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py:
##########
@@ -152,22 +162,105 @@
     )
     # [END howto_sensor_bigquery_table_partition_async]
 
+    # [START howto_sensor_bigquery_streaming_buffer_empty]
+    check_streaming_buffer_empty = BigQueryStreamingBufferEmptySensor(
+        task_id="check_streaming_buffer_empty",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        table_id=TABLE_NAME,
+        poke_interval=30,
+        timeout=5400,  # 90 minutes - Google Cloud flushes streaming buffer 
within 90 minutes
+    )
+    # [END howto_sensor_bigquery_streaming_buffer_empty]
+
+    # Streaming operations: INSERT, UPDATE, DELETE
+    # These operations write data to the streaming buffer before being flushed 
to persistent storage
+    stream_insert = BigQueryInsertJobOperator(
+        task_id="stream_insert",
+        configuration={
+            "query": {
+                "query": STREAMING_INSERT_QUERY,
+                "useLegacySql": False,
+            }
+        },
+    )
+
+    stream_update = BigQueryInsertJobOperator(
+        task_id="stream_update",
+        configuration={
+            "query": {
+                "query": STREAMING_UPDATE_QUERY,
+                "useLegacySql": False,
+            }
+        },
+    )
+
+    stream_delete = BigQueryInsertJobOperator(
+        task_id="stream_delete",
+        configuration={
+            "query": {
+                "query": STREAMING_DELETE_QUERY,
+                "useLegacySql": False,
+            }
+        },
+    )
+
+    # [START howto_sensor_bigquery_streaming_buffer_empty_defered]
+    check_streaming_buffer_empty_def = BigQueryStreamingBufferEmptySensor(
+        task_id="check_streaming_buffer_empty_def",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        table_id=TABLE_NAME,
+        deferrable=True,
+        poke_interval=30,
+        timeout=5400,  # 90 minutes - Google Cloud flushes streaming buffer 
within 90 minutes
+    )
+    # [END howto_sensor_bigquery_streaming_buffer_empty_defered]
+
+    # [START howto_sensor_bigquery_streaming_buffer_empty_async]
+    check_streaming_buffer_empty_async = BigQueryStreamingBufferEmptySensor(
+        task_id="check_streaming_buffer_empty_async",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        table_id=TABLE_NAME,
+        poke_interval=30,
+        timeout=5400,  # 90 minutes - Google Cloud flushes streaming buffer 
within 90 minutes
+    )
+    # [END howto_sensor_bigquery_streaming_buffer_empty_async]

Review Comment:
   @radhwene could you explain the reason of this task? I do not see any 
`async` operation here it is just the same task as for sync operation, could 
you explain please?



##########
providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py:
##########
@@ -256,3 +257,133 @@ def execute_complete(self, context: dict[str, Any], 
event: dict[str, str] | None
 
         message = "No event received in trigger callback"
         raise AirflowException(message)
+
+
+class BigQueryStreamingBufferEmptySensor(BaseSensorOperator):
+    """
+    Sensor for checking whether the streaming buffer in a BigQuery table is 
empty.
+
+    The BigQueryStreamingBufferEmptySensor waits for the streaming buffer in a 
specified
+    BigQuery table to be empty before proceeding. It can be used in ETL 
pipelines to ensure
+    that recent streamed data has been processed before continuing downstream 
tasks.
+
+    :param project_id: The Google Cloud project ID where the BigQuery table 
resides.
+    :param dataset_id: The ID of the dataset containing the BigQuery table.
+    :param table_id: The ID of the BigQuery table to monitor.
+    :param gcp_conn_id: The Airflow connection ID for GCP. Defaults to 
"google_cloud_default".
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :param deferrable: Run sensor in deferrable mode. Defaults to False.
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "dataset_id",
+        "table_id",
+        "impersonation_chain",
+    )
+
+    ui_color = "#f0eee4"
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        dataset_id: str,
+        table_id: str,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ) -> None:
+        if deferrable and "poke_interval" not in kwargs:
+            # TODO: Remove once deprecated
+            if "polling_interval" in kwargs:
+                kwargs["poke_interval"] = kwargs["polling_interval"]
+                warnings.warn(
+                    "Argument `polling_interval` is deprecated and will be 
removed "
+                    "in a future release.  Please use `poke_interval` 
instead.",
+                    AirflowProviderDeprecationWarning,
+                    stacklevel=2,
+                )

Review Comment:
   @radhwene could you please remove this code, because it is deprecated we 
should not add a deprecate code to the new operator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to