reuvenlax commented on code in PR #26975: URL: https://github.com/apache/beam/pull/26975#discussion_r1224798569
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java: ########## @@ -477,6 +476,56 @@ * reviewers mentioned <a * href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS"> * here</a>. + * + * <h3>Upserts and deletes</h3> + * + * The connector also supports streaming row updates to BigQuery, with the following qualifications: + * - The CREATE_IF_NEEDED CreateDisposition is not supported. Tables must be precreated with primary + * keys. - Only the STORAGE_WRITE_API_AT_LEAST_ONCE method is supported. + * + * <p>Two types of updates are supported. UPSERT replaces the row with the matching primary key or + * inserts the row if non exists. DELETE removes the row with the matching primary key. Row inserts + * are still allowed as normal using a separate instance of the sink, however care must be taken not + * to violate primary key uniqueness constraints, as those constraints are not enforced by BigQuery. + * If a table contains multiple rows with the same primary key, then row updates may not work as + * expected. In particular, these inserts should _only_ be done using the exactly-once sink + * (STORAGE_WRITE_API), as the at-least once sink may duplicate inserts, violating the constraint. + * + * <p>Since PCollections are unordered, in order to properly sequence updates a sequence number must + * be set on each update. BigQuery uses this sequence number to ensure that updates are correctly + * applied to the table even if they arrive out of order. + * + * <p>The simplest way to apply row updates if applying {@link TableRow} object is to use the {@link + * Write#applyRowMutations} method. Each {@link RowMutation} element contains a {@link TableRow}, an + * update type (UPSERT or DELETE), and a sequence number to order the updates. + * + * <pre>{@code + * PCollection<TableRow> rows = ...; + * row.apply(MapElements + * .into(new TypeDescriptor<RowMutation>(){}) + * .via(tableRow -> RowMutation.of(tableRow, getUpdateType(tableRow), getSequenceNumber(tableRow)))) + * .apply(BigQueryIO.applyRowMutations() + * .to(my_project:my_dataset.my_table) + * .withSchema(schema) + * .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE) + * .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER)); + * }</pre> + * + * <p>If writing a type other than TableRow (e.g. using {@link BigQueryIO#writeGenericRecords} or + * writing a custom user type), then the {@link Write#withRowMutationFn} method can be used to set + * an update type and sequence number for each record. For example: + * + * <pre>{@code + * PCollection<CdcEvent> cdcEvent = ...; + * + * cdcEvent.apply(BigQueryIO.write() + * .to("my-project:my_dataset.my_table") + * .withSchema(schema) + * .withFormatFunction(CdcEvent::getTableRow) + * .withRowMutationFn(cdc -> RowMutationInformation.of(cdc.getChangeType(), cdc.getSequenceNumber())) Review Comment: done ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; + +/** + * A convenience class for applying row updates to BigQuery using {@link + * BigQueryIO.Write#applyRowMutations}. This class encapsulates a {@link TableRow} payload along + * with information how to update the row. A sequence number must also be supplied to order the + * updates. Incorrect sequence numbers will result in unexpected state in the BigQuery table. + */ +@AutoValue +public abstract class RowMutation { + public abstract TableRow getTableRow(); + + public abstract RowMutationInformation getMutationInformation(); + + public static RowMutation of(TableRow tableRow, RowMutationInformation rowMutationInformation) { + return new AutoValue_RowMutation(tableRow, rowMutationInformation); + } + + public static class RowMutationCoder extends AtomicCoder<RowMutation> { + private static final RowMutationCoder INSTANCE = new RowMutationCoder(); + + public static RowMutationCoder of() { + return INSTANCE; + } + + @Override + public void encode(RowMutation value, OutputStream outStream) throws IOException { + TableRowJsonCoder.of().encode(value.getTableRow(), outStream); + VarIntCoder.of() + .encode(value.getMutationInformation().getMutationType().ordinal(), outStream); + VarLongCoder.of().encode(value.getMutationInformation().getSequenceNumber(), outStream); + } + + @Override + public RowMutation decode(InputStream inStream) throws IOException { + return RowMutation.of( + TableRowJsonCoder.of().decode(inStream), + RowMutationInformation.of( Review Comment: not worried here since the parameters have different types. If anyone reversed the parameters without updating this code, there would be a compile error ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java: ########## @@ -75,6 +77,11 @@ * with the Storage write API. */ public class TableRowToStorageApiProto { + private static final String CDC_CHANGE_SQN_COLUMN = "_CHANGE_SEQUENCE_NUMBER"; + private static final String CDC_CHANGE_TYPE_COLUMN = "_CHANGE_TYPE"; + private static final Set<String> CDC_COLUMNS = Review Comment: refactored ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java: ########## @@ -92,8 +93,26 @@ public class FakeDatasetService implements DatasetService, Serializable { public void close() throws Exception {} static class Stream { + static class Entry { + enum UpdateType { + INSERT, Review Comment: yeah, because we now use it internal to the test fixture ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/TableContainer.java: ########## @@ -53,12 +113,64 @@ long addRow(TableRow row, String id) { } } + void upsertRow(TableRow row, long sequenceNumber) { + List<Object> primaryKey = getPrimaryKey(row); + if (primaryKey == null) { + throw new RuntimeException("Upserts only allowed when using primary keys"); + } + long lastSequenceNumberForKey = lastSequenceNumber.getOrDefault(primaryKey, -1L); Review Comment: done ########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java: ########## @@ -446,9 +449,32 @@ private void assertBaseRecord(DynamicMessage msg, Map<String, Object> expectedFi public void testMessageFromGenericRecord() throws Exception { Descriptors.Descriptor descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema( - AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(NESTED_SCHEMA), true); + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(NESTED_SCHEMA), + true, + false); DynamicMessage msg = - AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, nestedRecord); + AvroGenericRecordToStorageApiProto.messageFromGenericRecord( + descriptor, nestedRecord, null, -1); + + assertEquals(2, msg.getAllFields().size()); + + Map<String, Descriptors.FieldDescriptor> fieldDescriptors = + descriptor.getFields().stream() + .collect(Collectors.toMap(Descriptors.FieldDescriptor::getName, Functions.identity())); + DynamicMessage nestedMsg = (DynamicMessage) msg.getField(fieldDescriptors.get("nested")); + assertBaseRecord(nestedMsg, baseProtoExpectedFields); + } + + @Test + public void testCdcFields() throws Exception { + Descriptors.Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(NESTED_SCHEMA), + true, + false); + DynamicMessage msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord( + descriptor, nestedRecord, null, -1); Review Comment: good catch - fixed. ########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java: ########## @@ -392,6 +393,26 @@ public void testMessageFromTableRow() throws Exception { assertBaseRecord(nestedMsg); } + @Test + public void testCdcFields() throws Exception { + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(NESTED_SCHEMA), true, true); + assertNotNull(descriptor.findFieldByName("_CHANGE_TYPE")); + assertNotNull(descriptor.findFieldByName("_CHANGE_SEQUENCE_NUMBER")); + DynamicMessage msg = + BeamRowToStorageApiProto.messageFromBeamRow(descriptor, NESTED_ROW, "UPDATE", 42); + assertEquals(5, msg.getAllFields().size()); + + Map<String, FieldDescriptor> fieldDescriptors = + descriptor.getFields().stream() + .collect(Collectors.toMap(FieldDescriptor::getName, Functions.identity())); + DynamicMessage nestedMsg = (DynamicMessage) msg.getField(fieldDescriptors.get("nested")); + assertBaseRecord(nestedMsg); + assertEquals("UPDATE", msg.getField(descriptor.findFieldByName("_CHANGE_TYPE"))); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
