[
https://issues.apache.org/jira/browse/BEAM-13953?focusedWorklogId=761172&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761172
]
ASF GitHub Bot logged work on BEAM-13953:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Apr/22 00:20
Start Date: 23/Apr/22 00:20
Worklog Time Spent: 10m
Work Description: chamikaramj commented on code in PR #17391:
URL: https://github.com/apache/beam/pull/17391#discussion_r856622614
##########
website/www/site/content/en/documentation/io/built-in/google-bigquery.md:
##########
@@ -770,6 +768,120 @@ You can either keep retrying, or return the failed
records in a separate
`PCollection` using the `WriteResult.getFailedInserts()` method.
{{< /paragraph >}}
+### Using the Storage Write API {#storage-write-api}
+
+Starting with version 2.36.0 of the Beam SDK for Java, you can use the
+[BigQuery Storage Write API](https://cloud.google.com/bigquery/docs/write-api)
+from the BigQueryIO connector.
+
+#### Exactly-once semantics
+
+To write to BigQuery using the Storage Write API, set `withMethod` to
+[`Method.STORAGE_WRITE_API`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html#STORAGE_WRITE_API).
+Here’s an example transform that writes to BigQuery using the Storage Write
API and exactly-once semantics:
+
+{{< highlight java >}}
+WriteResult writeResult = rows.apply("Save Rows to BigQuery",
+BigQueryIO.writeTableRows()
+ .to(options.getFullyQualifiedTableName())
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND)
+ .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+ .withMethod(Method.STORAGE_WRITE_API)
+);
+{{< /highlight >}}
+{{< highlight py >}}
+# The SDK for Python does not support the BigQuery Storage API.
+{{< /highlight >}}
+
+If your pipeline needs to create the table (in case it doesn’t exist and you
+specified the create disposition as `CREATE_IF_NEEDED`), you must provide a
+table schema. The API uses the schema to validate data and convert it to a
+binary protocol.
+
+{{< highlight java >}}
+TableSchema schema = new TableSchema().setFields(
+ List.of(
+ new TableFieldSchema()
+ .setName("request_ts")
+ .setType("TIMESTAMP")
+ .setMode("REQUIRED"),
+ new TableFieldSchema()
+ .setName("user_name")
+ .setType("STRING")
+ .setMode("REQUIRED")));
+{{< /highlight >}}
+{{< highlight py >}}
+# The SDK for Python does not support the BigQuery Storage API.
+{{< /highlight >}}
+
+For streaming pipelines, you need to set two additional parameters: the number
+of streams and the triggering frequency.
+
+{{< highlight java >}}
+BigQueryIO.writeTableRows()
+ // ...
+ .withTriggeringFrequency(Duration.standardSeconds(5))
+ .withNumStorageWriteApiStreams(3)
+);
+{{< /highlight >}}
+{{< highlight py >}}
+# The SDK for Python does not support the BigQuery Storage API.
+{{< /highlight >}}
+
+The number of streams defines the parallelism of the BigQueryIO Write transform
+and roughly corresponds to the number of Storage Write API streams that the
Review Comment:
We should also mention that users can change the default to Storage WRITE
API for all BQ sinks of the pipeline by using the
[this](https://github.com/apache/beam/blob/2c18ce0ccd7705473aa9ecc443dcdbe223dd9449/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java#L84)
pipeline option.
Issue Time Tracking
-------------------
Worklog Id: (was: 761172)
Time Spent: 1h 10m (was: 1h)
> Document BigQueryIO Storage Write API methods
> ---------------------------------------------
>
> Key: BEAM-13953
> URL: https://issues.apache.org/jira/browse/BEAM-13953
> Project: Beam
> Issue Type: New Feature
> Components: io-go-gcp, io-java-gcp, io-py-gcp, website
> Reporter: Sergei Lilichenko
> Priority: P1
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Current BigQuery IO documentation
> [section|https://beam.apache.org/documentation/io/built-in/google-bigquery/]
> is missing details on STORAGE_WRITE_API and WRITE_API_AT_LEAST_ONCE methods.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)