This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new fef4cf8a028 Merge pull request #28124: Allow using CREATE_IF_NEEDED when writing deletes or updates to BigQuery fef4cf8a028 is described below commit fef4cf8a0289e060ca05b6808beaf5734707d8a2 Author: Reuven Lax <re...@google.com> AuthorDate: Thu Aug 24 15:45:15 2023 -0700 Merge pull request #28124: Allow using CREATE_IF_NEEDED when writing deletes or updates to BigQuery --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 39 ++++++++++++++++---- .../sdk/io/gcp/bigquery/CreateTableHelpers.java | 9 +++++ .../beam/sdk/io/gcp/bigquery/CreateTables.java | 5 +++ .../sdk/io/gcp/bigquery/DynamicDestinations.java | 9 +++++ .../gcp/bigquery/DynamicDestinationsHelpers.java | 30 ++++++++++++++++ .../bigquery/StorageApiDynamicDestinations.java | 36 ++----------------- .../bigquery/StorageApiWriteUnshardedRecords.java | 1 + .../bigquery/StorageApiWritesShardedRecords.java | 4 ++- .../io/gcp/bigquery/StorageApiSinkRowUpdateIT.java | 42 +++++----------------- 9 files changed, 100 insertions(+), 75 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 96da67321cb..58d76931244 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -30,6 +30,7 @@ import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableCell; +import com.google.api.services.bigquery.model.TableConstraints; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; @@ -510,7 +511,8 @@ import org.slf4j.LoggerFactory; * .apply(BigQueryIO.applyRowMutations() * .to(my_project:my_dataset.my_table) * .withSchema(schema) - * .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER)); + * .withPrimaryKey(ImmutableList.of("field1", "field2")) + * .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)); * }</pre> * * <p>If writing a type other than TableRow (e.g. using {@link BigQueryIO#writeGenericRecords} or @@ -523,12 +525,17 @@ import org.slf4j.LoggerFactory; * cdcEvent.apply(BigQueryIO.write() * .to("my-project:my_dataset.my_table") * .withSchema(schema) + * .withPrimaryKey(ImmutableList.of("field1", "field2")) * .withFormatFunction(CdcEvent::getTableRow) * .withRowMutationInformationFn(cdc -> RowMutationInformation.of(cdc.getChangeType(), * cdc.getSequenceNumber())) * .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE) - * .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER)); + * .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)); * }</pre> + * + * <p>Note that in order to use inserts or deletes, the table must bet set up with a primary key. If + * the table is not previously created and CREATE_IF_NEEDED is used, a primary key must be + * specified. */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20506) @@ -2318,6 +2325,8 @@ public class BigQueryIO { abstract @Nullable String getKmsKey(); + abstract @Nullable List<String> getPrimaryKey(); + abstract Boolean getOptimizeWrites(); abstract Boolean getUseBeamSchema(); @@ -2416,7 +2425,9 @@ public class BigQueryIO { abstract Builder<T> setIgnoreInsertIds(Boolean ignoreInsertIds); - abstract Builder<T> setKmsKey(String kmsKey); + abstract Builder<T> setKmsKey(@Nullable String kmsKey); + + abstract Builder<T> setPrimaryKey(@Nullable List<String> primaryKey); abstract Builder<T> setOptimizeWrites(Boolean optimizeWrites); @@ -2947,6 +2958,10 @@ public class BigQueryIO { return toBuilder().setKmsKey(kmsKey).build(); } + public Write<T> withPrimaryKey(List<String> primaryKey) { + return toBuilder().setPrimaryKey(primaryKey).build(); + } + /** * If true, enables new codepaths that are expected to use less resources while writing to * BigQuery. Not enabled by default in order to maintain backwards compatibility. @@ -3241,6 +3256,7 @@ public class BigQueryIO { LOG.warn("Setting the number of Storage API streams" + error); } } + if (method == Method.STORAGE_API_AT_LEAST_ONCE && getStorageApiNumStreams(bqOptions) != 0) { LOG.warn( "Setting a number of Storage API streams is only supported when using STORAGE_WRITE_API"); @@ -3254,9 +3270,12 @@ public class BigQueryIO { if (getRowMutationInformationFn() != null) { checkArgument(getMethod() == Method.STORAGE_API_AT_LEAST_ONCE); checkArgument( - getCreateDisposition() == CreateDisposition.CREATE_NEVER, - "CREATE_IF_NEEDED is not supported when applying row updates. Tables must be precreated " - + "with a primary key specified."); + getCreateDisposition() == CreateDisposition.CREATE_NEVER || getPrimaryKey() != null, + "If specifying CREATE_IF_NEEDED along with row updates, a primary key needs to be specified"); + } + if (getPrimaryKey() != null) { + checkArgument( + getMethod() != Method.FILE_LOADS, "Primary key not supported when using FILE_LOADS"); } if (getAutoSchemaUpdate()) { @@ -3311,6 +3330,14 @@ public class BigQueryIO { getJsonTimePartitioning(), StaticValueProvider.of(BigQueryHelpers.toJsonString(getClustering()))); } + if (getPrimaryKey() != null) { + dynamicDestinations = + new DynamicDestinationsHelpers.ConstantTableConstraintsDestinations<>( + (DynamicDestinations<T, TableDestination>) dynamicDestinations, + new TableConstraints() + .setPrimaryKey( + new TableConstraints.PrimaryKey().setColumns(getPrimaryKey()))); + } } return expandTyped(input, dynamicDestinations); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java index e7d4c32993b..6edd3f71cc7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java @@ -25,6 +25,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.EncryptionConfiguration; import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableConstraints; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; @@ -86,6 +87,7 @@ public class CreateTableHelpers { BigQueryOptions bigQueryOptions, TableDestination tableDestination, Supplier<@Nullable TableSchema> schemaSupplier, + Supplier<@Nullable TableConstraints> tableConstraintsSupplier, CreateDisposition createDisposition, @Nullable Coder<?> tableDestinationCoder, @Nullable String kmsKey, @@ -125,6 +127,7 @@ public class CreateTableHelpers { tryCreateTable( bigQueryOptions, schemaSupplier, + tableConstraintsSupplier, tableDestination, createDisposition, tableSpec, @@ -139,6 +142,7 @@ public class CreateTableHelpers { private static void tryCreateTable( BigQueryOptions options, Supplier<@Nullable TableSchema> schemaSupplier, + Supplier<@Nullable TableConstraints> tableConstraintsSupplier, TableDestination tableDestination, CreateDisposition createDisposition, String tableSpec, @@ -151,6 +155,7 @@ public class CreateTableHelpers { tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC) == null) { TableSchema tableSchema = schemaSupplier.get(); + @Nullable TableConstraints tableConstraints = tableConstraintsSupplier.get(); Preconditions.checkArgumentNotNull( tableSchema, "Unless create disposition is %s, a schema must be specified, i.e. " @@ -162,6 +167,10 @@ public class CreateTableHelpers { tableDestination); Table table = new Table().setTableReference(tableReference).setSchema(tableSchema); + if (tableConstraints != null) { + table = table.setTableConstraints(tableConstraints); + } + String tableDescription = tableDestination.getTableDescription(); if (tableDescription != null) { table = table.setDescription(tableDescription); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java index 1856b5ab63f..7e5299b7e67 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import com.google.api.services.bigquery.model.TableConstraints; import com.google.api.services.bigquery.model.TableSchema; import java.util.List; import java.util.Map; @@ -113,10 +114,14 @@ public class CreateTables<DestinationT, ElementT> dest); Supplier<@Nullable TableSchema> schemaSupplier = () -> dynamicDestinations.getSchema(dest); + Supplier<@Nullable TableConstraints> tableConstraintsSupplier = + () -> dynamicDestinations.getTableConstraints(dest); + return CreateTableHelpers.possiblyCreateTable( context.getPipelineOptions().as(BigQueryOptions.class), tableDestination1, schemaSupplier, + tableConstraintsSupplier, createDisposition, dynamicDestinations.getDestinationCoder(), kmsKey, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java index b2041f69bda..e5cf82d7c2e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import com.google.api.services.bigquery.model.TableConstraints; import com.google.api.services.bigquery.model.TableSchema; import java.io.Serializable; import java.util.List; @@ -154,6 +155,14 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab /** Returns the table schema for the destination. */ public abstract @Nullable TableSchema getSchema(DestinationT destination); + /** + * Returns TableConstraints (including primary and foreign key) to be used when creating the + * table. Note: this is not currently supported when using FILE_LOADS!. + */ + public @Nullable TableConstraints getTableConstraints(DestinationT destination) { + return null; + } + // Gets the destination coder. If the user does not provide one, try to find one in the coder // registry. If no coder can be found, throws CannotProvideCoderException. Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index 30492647457..62355fd9417 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -23,6 +23,7 @@ import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableConstraints; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import java.io.IOException; @@ -177,6 +178,11 @@ class DynamicDestinationsHelpers { return inner.getSchema(destination); } + @Override + public @Nullable TableConstraints getTableConstraints(DestinationT destination) { + return inner.getTableConstraints(destination); + } + @Override public TableDestination getTable(DestinationT destination) { return inner.getTable(destination); @@ -214,6 +220,30 @@ class DynamicDestinationsHelpers { } } + static class ConstantTableConstraintsDestinations<T, DestinationT> + extends DelegatingDynamicDestinations<T, DestinationT> { + private final String jsonTableConstraints; + + ConstantTableConstraintsDestinations( + DynamicDestinations<T, DestinationT> inner, TableConstraints tableConstraints) { + super(inner); + this.jsonTableConstraints = BigQueryHelpers.toJsonString(tableConstraints); + } + + @Override + public TableConstraints getTableConstraints(DestinationT destination) { + return BigQueryHelpers.fromJsonString(jsonTableConstraints, TableConstraints.class); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("inner", inner) + .add("tableConstraints", jsonTableConstraints) + .toString(); + } + } + /** Returns the same schema for every table. */ static class ConstantSchemaDestinations<T, DestinationT> extends DelegatingDynamicDestinations<T, DestinationT> { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java index fdf330d378f..8ec4d52e3b9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java @@ -18,19 +18,14 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; import com.google.protobuf.DescriptorProtos; -import java.util.List; import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.ValueInSingleWindow; /** Base dynamicDestinations class used by the Storage API sink. */ abstract class StorageApiDynamicDestinations<T, DestinationT> - extends DynamicDestinations<T, DestinationT> { + extends DynamicDestinationsHelpers.DelegatingDynamicDestinations<T, DestinationT> { public interface MessageConverter<T> { com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema(); @@ -42,40 +37,13 @@ abstract class StorageApiDynamicDestinations<T, DestinationT> TableRow toTableRow(T element); } - private DynamicDestinations<T, DestinationT> inner; - StorageApiDynamicDestinations(DynamicDestinations<T, DestinationT> inner) { - this.inner = inner; + super(inner); } public abstract MessageConverter<T> getMessageConverter( DestinationT destination, DatasetService datasetService) throws Exception; - @Override - public DestinationT getDestination(@Nullable ValueInSingleWindow<T> element) { - return inner.getDestination(element); - } - - @Override - public @Nullable Coder<DestinationT> getDestinationCoder() { - return inner.getDestinationCoder(); - } - - @Override - public TableDestination getTable(DestinationT destination) { - return inner.getTable(destination); - } - - @Override - public @Nullable TableSchema getSchema(DestinationT destination) { - return inner.getSchema(destination); - } - - @Override - public List<PCollectionView<?>> getSideInputs() { - return inner.getSideInputs(); - } - @Override void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) { super.setSideInputAccessorFromProcessContext(context); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 27a5b30c156..3ac5140f73f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -939,6 +939,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> c.getPipelineOptions().as(BigQueryOptions.class), tableDestination1, () -> dynamicDestinations.getSchema(destination), + () -> dynamicDestinations.getTableConstraints(destination), createDisposition, destinationCoder, kmsKey, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index cc7f221e32e..cf7de067e15 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -441,10 +441,12 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object Coder<DestinationT> destinationCoder = dynamicDestinations.getDestinationCoder(); Callable<Boolean> tryCreateTable = () -> { + DestinationT dest = element.getKey().getKey(); CreateTableHelpers.possiblyCreateTable( c.getPipelineOptions().as(BigQueryOptions.class), tableDestination, - () -> dynamicDestinations.getSchema(element.getKey().getKey()), + () -> dynamicDestinations.getSchema(dest), + () -> dynamicDestinations.getTableConstraints(dest), createDisposition, destinationCoder, kmsKey, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java index c790dffe7ed..d5366fe2961 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; @@ -59,38 +60,8 @@ public class StorageApiSinkRowUpdateIT { BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID); } - private static String createTable(TableSchema tableSchema, List<String> primaryKey) - throws IOException, InterruptedException { - String table = "table" + System.nanoTime(); - - BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, table); - - StringBuilder ddl = - new StringBuilder("CREATE TABLE ") - .append(PROJECT) - .append(".") - .append(BIG_QUERY_DATASET_ID) - .append(".") - .append(table) - .append("("); - for (TableFieldSchema tableFieldSchema : tableSchema.getFields()) { - ddl.append(tableFieldSchema.getName()) - .append(" ") - .append(tableFieldSchema.getType()) - .append(","); - } - - String primaryKeyString = String.join(",", primaryKey); - ddl.append(" PRIMARY KEY ") - .append("(") - .append(primaryKeyString) - .append(")") - .append(" NOT ENFORCED) "); - ddl.append("CLUSTER BY ").append(primaryKeyString); - - BQ_CLIENT.queryWithRetriesUsingStandardSql(ddl.toString(), PROJECT); - - return PROJECT + "." + BIG_QUERY_DATASET_ID + "." + table; + private static String getTablespec() { + return PROJECT + "." + BIG_QUERY_DATASET_ID + "." + "table" + System.nanoTime(); } @Test @@ -130,7 +101,8 @@ public class StorageApiSinkRowUpdateIT { new TableRow().set("key1", "foo4").set("key2", "bar4").set("value", "1"), RowMutationInformation.of(RowMutationInformation.MutationType.DELETE, 1))); - String tableSpec = createTable(tableSchema, Lists.newArrayList("key1", "key2")); + List<String> primaryKey = Lists.newArrayList("key1", "key2"); + String tableSpec = getTablespec(); Pipeline p = Pipeline.create(); p.apply("Create rows", Create.of(items)) .apply( @@ -138,8 +110,10 @@ public class StorageApiSinkRowUpdateIT { BigQueryIO.applyRowMutations() .to(tableSpec) .withSchema(tableSchema) + .withPrimaryKey(primaryKey) + .withClustering(new Clustering().setFields(primaryKey)) .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)); + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); p.run();