MaksYermak commented on code in PR #61148:
URL: https://github.com/apache/airflow/pull/61148#discussion_r2815604019
##########
providers/google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py:
##########
@@ -152,22 +162,105 @@
)
# [END howto_sensor_bigquery_table_partition_async]
+ # [START howto_sensor_bigquery_streaming_buffer_empty]
+ check_streaming_buffer_empty = BigQueryStreamingBufferEmptySensor(
+ task_id="check_streaming_buffer_empty",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ table_id=TABLE_NAME,
+ poke_interval=30,
+ timeout=5400, # 90 minutes - Google Cloud flushes streaming buffer
within 90 minutes
+ )
+ # [END howto_sensor_bigquery_streaming_buffer_empty]
+
+ # Streaming operations: INSERT, UPDATE, DELETE
+ # These operations write data to the streaming buffer before being flushed
to persistent storage
+ stream_insert = BigQueryInsertJobOperator(
+ task_id="stream_insert",
+ configuration={
+ "query": {
+ "query": STREAMING_INSERT_QUERY,
+ "useLegacySql": False,
+ }
+ },
+ )
+
+ stream_update = BigQueryInsertJobOperator(
+ task_id="stream_update",
+ configuration={
+ "query": {
+ "query": STREAMING_UPDATE_QUERY,
+ "useLegacySql": False,
+ }
+ },
+ )
+
+ stream_delete = BigQueryInsertJobOperator(
+ task_id="stream_delete",
+ configuration={
+ "query": {
+ "query": STREAMING_DELETE_QUERY,
+ "useLegacySql": False,
+ }
+ },
+ )
+
+ # [START howto_sensor_bigquery_streaming_buffer_empty_defered]
+ check_streaming_buffer_empty_def = BigQueryStreamingBufferEmptySensor(
+ task_id="check_streaming_buffer_empty_def",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ table_id=TABLE_NAME,
+ deferrable=True,
+ poke_interval=30,
+ timeout=5400, # 90 minutes - Google Cloud flushes streaming buffer
within 90 minutes
+ )
+ # [END howto_sensor_bigquery_streaming_buffer_empty_defered]
+
+ # [START howto_sensor_bigquery_streaming_buffer_empty_async]
+ check_streaming_buffer_empty_async = BigQueryStreamingBufferEmptySensor(
+ task_id="check_streaming_buffer_empty_async",
+ project_id=PROJECT_ID,
+ dataset_id=DATASET_NAME,
+ table_id=TABLE_NAME,
+ poke_interval=30,
+ timeout=5400, # 90 minutes - Google Cloud flushes streaming buffer
within 90 minutes
+ )
+ # [END howto_sensor_bigquery_streaming_buffer_empty_async]
Review Comment:
@radhwene could you explain the reason of this task? I do not see any
`async` operation here it is just the same task as for sync operation, could
you explain please?
##########
providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py:
##########
@@ -256,3 +257,133 @@ def execute_complete(self, context: dict[str, Any],
event: dict[str, str] | None
message = "No event received in trigger callback"
raise AirflowException(message)
+
+
+class BigQueryStreamingBufferEmptySensor(BaseSensorOperator):
+ """
+ Sensor for checking whether the streaming buffer in a BigQuery table is
empty.
+
+ The BigQueryStreamingBufferEmptySensor waits for the streaming buffer in a
specified
+ BigQuery table to be empty before proceeding. It can be used in ETL
pipelines to ensure
+ that recent streamed data has been processed before continuing downstream
tasks.
+
+ :param project_id: The Google Cloud project ID where the BigQuery table
resides.
+ :param dataset_id: The ID of the dataset containing the BigQuery table.
+ :param table_id: The ID of the BigQuery table to monitor.
+ :param gcp_conn_id: The Airflow connection ID for GCP. Defaults to
"google_cloud_default".
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :param deferrable: Run sensor in deferrable mode. Defaults to False.
+ """
+
+ template_fields: Sequence[str] = (
+ "project_id",
+ "dataset_id",
+ "table_id",
+ "impersonation_chain",
+ )
+
+ ui_color = "#f0eee4"
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ dataset_id: str,
+ table_id: str,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs,
+ ) -> None:
+ if deferrable and "poke_interval" not in kwargs:
+ # TODO: Remove once deprecated
+ if "polling_interval" in kwargs:
+ kwargs["poke_interval"] = kwargs["polling_interval"]
+ warnings.warn(
+ "Argument `polling_interval` is deprecated and will be
removed "
+ "in a future release. Please use `poke_interval`
instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
Review Comment:
@radhwene could you please remove this code, because it is deprecated we
should not add a deprecate code to the new operator?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]