reuvenlax commented on code in PR #26975: URL: https://github.com/apache/beam/pull/26975#discussion_r1224798288
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java: ########## @@ -477,6 +476,56 @@ * reviewers mentioned <a * href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS"> * here</a>. + * + * <h3>Upserts and deletes</h3> + * + * The connector also supports streaming row updates to BigQuery, with the following qualifications: + * - The CREATE_IF_NEEDED CreateDisposition is not supported. Tables must be precreated with primary + * keys. - Only the STORAGE_WRITE_API_AT_LEAST_ONCE method is supported. + * + * <p>Two types of updates are supported. UPSERT replaces the row with the matching primary key or + * inserts the row if non exists. DELETE removes the row with the matching primary key. Row inserts + * are still allowed as normal using a separate instance of the sink, however care must be taken not Review Comment: Removed sentence ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java: ########## @@ -477,6 +476,56 @@ * reviewers mentioned <a * href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS"> * here</a>. + * + * <h3>Upserts and deletes</h3> + * + * The connector also supports streaming row updates to BigQuery, with the following qualifications: + * - The CREATE_IF_NEEDED CreateDisposition is not supported. Tables must be precreated with primary + * keys. - Only the STORAGE_WRITE_API_AT_LEAST_ONCE method is supported. + * + * <p>Two types of updates are supported. UPSERT replaces the row with the matching primary key or + * inserts the row if non exists. DELETE removes the row with the matching primary key. Row inserts + * are still allowed as normal using a separate instance of the sink, however care must be taken not + * to violate primary key uniqueness constraints, as those constraints are not enforced by BigQuery. + * If a table contains multiple rows with the same primary key, then row updates may not work as + * expected. In particular, these inserts should _only_ be done using the exactly-once sink + * (STORAGE_WRITE_API), as the at-least once sink may duplicate inserts, violating the constraint. + * + * <p>Since PCollections are unordered, in order to properly sequence updates a sequence number must + * be set on each update. BigQuery uses this sequence number to ensure that updates are correctly + * applied to the table even if they arrive out of order. + * + * <p>The simplest way to apply row updates if applying {@link TableRow} object is to use the {@link + * Write#applyRowMutations} method. Each {@link RowMutation} element contains a {@link TableRow}, an + * update type (UPSERT or DELETE), and a sequence number to order the updates. + * + * <pre>{@code + * PCollection<TableRow> rows = ...; + * row.apply(MapElements + * .into(new TypeDescriptor<RowMutation>(){}) + * .via(tableRow -> RowMutation.of(tableRow, getUpdateType(tableRow), getSequenceNumber(tableRow)))) Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
