This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fef4cf8a028 Merge pull request #28124: Allow using CREATE_IF_NEEDED 
when writing deletes or updates to BigQuery
fef4cf8a028 is described below

commit fef4cf8a0289e060ca05b6808beaf5734707d8a2
Author: Reuven Lax <re...@google.com>
AuthorDate: Thu Aug 24 15:45:15 2023 -0700

    Merge pull request #28124: Allow using CREATE_IF_NEEDED when writing 
deletes or updates to BigQuery
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       | 39 ++++++++++++++++----
 .../sdk/io/gcp/bigquery/CreateTableHelpers.java    |  9 +++++
 .../beam/sdk/io/gcp/bigquery/CreateTables.java     |  5 +++
 .../sdk/io/gcp/bigquery/DynamicDestinations.java   |  9 +++++
 .../gcp/bigquery/DynamicDestinationsHelpers.java   | 30 ++++++++++++++++
 .../bigquery/StorageApiDynamicDestinations.java    | 36 ++-----------------
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  1 +
 .../bigquery/StorageApiWritesShardedRecords.java   |  4 ++-
 .../io/gcp/bigquery/StorageApiSinkRowUpdateIT.java | 42 +++++-----------------
 9 files changed, 100 insertions(+), 75 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 96da67321cb..58d76931244 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -30,6 +30,7 @@ import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableCell;
+import com.google.api.services.bigquery.model.TableConstraints;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
@@ -510,7 +511,8 @@ import org.slf4j.LoggerFactory;
  *    .apply(BigQueryIO.applyRowMutations()
  *           .to(my_project:my_dataset.my_table)
  *           .withSchema(schema)
- *           .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER));
+ *           .withPrimaryKey(ImmutableList.of("field1", "field2"))
+ *           .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED));
  * }</pre>
  *
  * <p>If writing a type other than TableRow (e.g. using {@link 
BigQueryIO#writeGenericRecords} or
@@ -523,12 +525,17 @@ import org.slf4j.LoggerFactory;
  * cdcEvent.apply(BigQueryIO.write()
  *          .to("my-project:my_dataset.my_table")
  *          .withSchema(schema)
+ *          .withPrimaryKey(ImmutableList.of("field1", "field2"))
  *          .withFormatFunction(CdcEvent::getTableRow)
  *          .withRowMutationInformationFn(cdc -> 
RowMutationInformation.of(cdc.getChangeType(),
  *                                                                         
cdc.getSequenceNumber()))
  *          .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
- *          .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER));
+ *          .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED));
  * }</pre>
+ *
+ * <p>Note that in order to use inserts or deletes, the table must bet set up 
with a primary key. If
+ * the table is not previously created and CREATE_IF_NEEDED is used, a primary 
key must be
+ * specified.
  */
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20506)
@@ -2318,6 +2325,8 @@ public class BigQueryIO {
 
     abstract @Nullable String getKmsKey();
 
+    abstract @Nullable List<String> getPrimaryKey();
+
     abstract Boolean getOptimizeWrites();
 
     abstract Boolean getUseBeamSchema();
@@ -2416,7 +2425,9 @@ public class BigQueryIO {
 
       abstract Builder<T> setIgnoreInsertIds(Boolean ignoreInsertIds);
 
-      abstract Builder<T> setKmsKey(String kmsKey);
+      abstract Builder<T> setKmsKey(@Nullable String kmsKey);
+
+      abstract Builder<T> setPrimaryKey(@Nullable List<String> primaryKey);
 
       abstract Builder<T> setOptimizeWrites(Boolean optimizeWrites);
 
@@ -2947,6 +2958,10 @@ public class BigQueryIO {
       return toBuilder().setKmsKey(kmsKey).build();
     }
 
+    public Write<T> withPrimaryKey(List<String> primaryKey) {
+      return toBuilder().setPrimaryKey(primaryKey).build();
+    }
+
     /**
      * If true, enables new codepaths that are expected to use less resources 
while writing to
      * BigQuery. Not enabled by default in order to maintain backwards 
compatibility.
@@ -3241,6 +3256,7 @@ public class BigQueryIO {
           LOG.warn("Setting the number of Storage API streams" + error);
         }
       }
+
       if (method == Method.STORAGE_API_AT_LEAST_ONCE && 
getStorageApiNumStreams(bqOptions) != 0) {
         LOG.warn(
             "Setting a number of Storage API streams is only supported when 
using STORAGE_WRITE_API");
@@ -3254,9 +3270,12 @@ public class BigQueryIO {
       if (getRowMutationInformationFn() != null) {
         checkArgument(getMethod() == Method.STORAGE_API_AT_LEAST_ONCE);
         checkArgument(
-            getCreateDisposition() == CreateDisposition.CREATE_NEVER,
-            "CREATE_IF_NEEDED is not supported when applying row updates. 
Tables must be precreated "
-                + "with a primary key specified.");
+            getCreateDisposition() == CreateDisposition.CREATE_NEVER || 
getPrimaryKey() != null,
+            "If specifying CREATE_IF_NEEDED along with row updates, a primary 
key needs to be specified");
+      }
+      if (getPrimaryKey() != null) {
+        checkArgument(
+            getMethod() != Method.FILE_LOADS, "Primary key not supported when 
using FILE_LOADS");
       }
 
       if (getAutoSchemaUpdate()) {
@@ -3311,6 +3330,14 @@ public class BigQueryIO {
                   getJsonTimePartitioning(),
                   
StaticValueProvider.of(BigQueryHelpers.toJsonString(getClustering())));
         }
+        if (getPrimaryKey() != null) {
+          dynamicDestinations =
+              new 
DynamicDestinationsHelpers.ConstantTableConstraintsDestinations<>(
+                  (DynamicDestinations<T, TableDestination>) 
dynamicDestinations,
+                  new TableConstraints()
+                      .setPrimaryKey(
+                          new 
TableConstraints.PrimaryKey().setColumns(getPrimaryKey())));
+        }
       }
       return expandTyped(input, dynamicDestinations);
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
index e7d4c32993b..6edd3f71cc7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
@@ -25,6 +25,7 @@ import com.google.api.gax.rpc.ApiException;
 import com.google.api.services.bigquery.model.Clustering;
 import com.google.api.services.bigquery.model.EncryptionConfiguration;
 import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableConstraints;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
@@ -86,6 +87,7 @@ public class CreateTableHelpers {
       BigQueryOptions bigQueryOptions,
       TableDestination tableDestination,
       Supplier<@Nullable TableSchema> schemaSupplier,
+      Supplier<@Nullable TableConstraints> tableConstraintsSupplier,
       CreateDisposition createDisposition,
       @Nullable Coder<?> tableDestinationCoder,
       @Nullable String kmsKey,
@@ -125,6 +127,7 @@ public class CreateTableHelpers {
           tryCreateTable(
               bigQueryOptions,
               schemaSupplier,
+              tableConstraintsSupplier,
               tableDestination,
               createDisposition,
               tableSpec,
@@ -139,6 +142,7 @@ public class CreateTableHelpers {
   private static void tryCreateTable(
       BigQueryOptions options,
       Supplier<@Nullable TableSchema> schemaSupplier,
+      Supplier<@Nullable TableConstraints> tableConstraintsSupplier,
       TableDestination tableDestination,
       CreateDisposition createDisposition,
       String tableSpec,
@@ -151,6 +155,7 @@ public class CreateTableHelpers {
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
         TableSchema tableSchema = schemaSupplier.get();
+        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
         Preconditions.checkArgumentNotNull(
             tableSchema,
             "Unless create disposition is %s, a schema must be specified, i.e. 
"
@@ -162,6 +167,10 @@ public class CreateTableHelpers {
             tableDestination);
         Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
+        if (tableConstraints != null) {
+          table = table.setTableConstraints(tableConstraints);
+        }
+
         String tableDescription = tableDestination.getTableDescription();
         if (tableDescription != null) {
           table = table.setDescription(tableDescription);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index 1856b5ab63f..7e5299b7e67 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
+import com.google.api.services.bigquery.model.TableConstraints;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.util.List;
 import java.util.Map;
@@ -113,10 +114,14 @@ public class CreateTables<DestinationT, ElementT>
                     dest);
                 Supplier<@Nullable TableSchema> schemaSupplier =
                     () -> dynamicDestinations.getSchema(dest);
+                Supplier<@Nullable TableConstraints> tableConstraintsSupplier =
+                    () -> dynamicDestinations.getTableConstraints(dest);
+
                 return CreateTableHelpers.possiblyCreateTable(
                     context.getPipelineOptions().as(BigQueryOptions.class),
                     tableDestination1,
                     schemaSupplier,
+                    tableConstraintsSupplier,
                     createDisposition,
                     dynamicDestinations.getDestinationCoder(),
                     kmsKey,
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index b2041f69bda..e5cf82d7c2e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static 
org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
+import com.google.api.services.bigquery.model.TableConstraints;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.io.Serializable;
 import java.util.List;
@@ -154,6 +155,14 @@ public abstract class DynamicDestinations<T, DestinationT> 
implements Serializab
   /** Returns the table schema for the destination. */
   public abstract @Nullable TableSchema getSchema(DestinationT destination);
 
+  /**
+   * Returns TableConstraints (including primary and foreign key) to be used 
when creating the
+   * table. Note: this is not currently supported when using FILE_LOADS!.
+   */
+  public @Nullable TableConstraints getTableConstraints(DestinationT 
destination) {
+    return null;
+  }
+
   // Gets the destination coder. If the user does not provide one, try to find 
one in the coder
   // registry. If no coder can be found, throws CannotProvideCoderException.
   Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
index 30492647457..62355fd9417 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -23,6 +23,7 @@ import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableConstraints;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.io.IOException;
@@ -177,6 +178,11 @@ class DynamicDestinationsHelpers {
       return inner.getSchema(destination);
     }
 
+    @Override
+    public @Nullable TableConstraints getTableConstraints(DestinationT 
destination) {
+      return inner.getTableConstraints(destination);
+    }
+
     @Override
     public TableDestination getTable(DestinationT destination) {
       return inner.getTable(destination);
@@ -214,6 +220,30 @@ class DynamicDestinationsHelpers {
     }
   }
 
+  static class ConstantTableConstraintsDestinations<T, DestinationT>
+      extends DelegatingDynamicDestinations<T, DestinationT> {
+    private final String jsonTableConstraints;
+
+    ConstantTableConstraintsDestinations(
+        DynamicDestinations<T, DestinationT> inner, TableConstraints 
tableConstraints) {
+      super(inner);
+      this.jsonTableConstraints = 
BigQueryHelpers.toJsonString(tableConstraints);
+    }
+
+    @Override
+    public TableConstraints getTableConstraints(DestinationT destination) {
+      return BigQueryHelpers.fromJsonString(jsonTableConstraints, 
TableConstraints.class);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("inner", inner)
+          .add("tableConstraints", jsonTableConstraints)
+          .toString();
+    }
+  }
+
   /** Returns the same schema for every table. */
   static class ConstantSchemaDestinations<T, DestinationT>
       extends DelegatingDynamicDestinations<T, DestinationT> {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
index fdf330d378f..8ec4d52e3b9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
@@ -18,19 +18,14 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
 import com.google.protobuf.DescriptorProtos;
-import java.util.List;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /** Base dynamicDestinations class used by the Storage API sink. */
 abstract class StorageApiDynamicDestinations<T, DestinationT>
-    extends DynamicDestinations<T, DestinationT> {
+    extends DynamicDestinationsHelpers.DelegatingDynamicDestinations<T, 
DestinationT> {
   public interface MessageConverter<T> {
     com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema();
 
@@ -42,40 +37,13 @@ abstract class StorageApiDynamicDestinations<T, 
DestinationT>
     TableRow toTableRow(T element);
   }
 
-  private DynamicDestinations<T, DestinationT> inner;
-
   StorageApiDynamicDestinations(DynamicDestinations<T, DestinationT> inner) {
-    this.inner = inner;
+    super(inner);
   }
 
   public abstract MessageConverter<T> getMessageConverter(
       DestinationT destination, DatasetService datasetService) throws 
Exception;
 
-  @Override
-  public DestinationT getDestination(@Nullable ValueInSingleWindow<T> element) 
{
-    return inner.getDestination(element);
-  }
-
-  @Override
-  public @Nullable Coder<DestinationT> getDestinationCoder() {
-    return inner.getDestinationCoder();
-  }
-
-  @Override
-  public TableDestination getTable(DestinationT destination) {
-    return inner.getTable(destination);
-  }
-
-  @Override
-  public @Nullable TableSchema getSchema(DestinationT destination) {
-    return inner.getSchema(destination);
-  }
-
-  @Override
-  public List<PCollectionView<?>> getSideInputs() {
-    return inner.getSideInputs();
-  }
-
   @Override
   void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext 
context) {
     super.setSideInputAccessorFromProcessContext(context);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 27a5b30c156..3ac5140f73f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -939,6 +939,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
                 c.getPipelineOptions().as(BigQueryOptions.class),
                 tableDestination1,
                 () -> dynamicDestinations.getSchema(destination),
+                () -> dynamicDestinations.getTableConstraints(destination),
                 createDisposition,
                 destinationCoder,
                 kmsKey,
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index cc7f221e32e..cf7de067e15 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -441,10 +441,12 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
       Coder<DestinationT> destinationCoder = 
dynamicDestinations.getDestinationCoder();
       Callable<Boolean> tryCreateTable =
           () -> {
+            DestinationT dest = element.getKey().getKey();
             CreateTableHelpers.possiblyCreateTable(
                 c.getPipelineOptions().as(BigQueryOptions.class),
                 tableDestination,
-                () -> dynamicDestinations.getSchema(element.getKey().getKey()),
+                () -> dynamicDestinations.getSchema(dest),
+                () -> dynamicDestinations.getTableConstraints(dest),
                 createDisposition,
                 destinationCoder,
                 kmsKey,
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
index c790dffe7ed..d5366fe2961 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 
+import com.google.api.services.bigquery.model.Clustering;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
@@ -59,38 +60,8 @@ public class StorageApiSinkRowUpdateIT {
     BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
   }
 
-  private static String createTable(TableSchema tableSchema, List<String> 
primaryKey)
-      throws IOException, InterruptedException {
-    String table = "table" + System.nanoTime();
-
-    BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, table);
-
-    StringBuilder ddl =
-        new StringBuilder("CREATE TABLE ")
-            .append(PROJECT)
-            .append(".")
-            .append(BIG_QUERY_DATASET_ID)
-            .append(".")
-            .append(table)
-            .append("(");
-    for (TableFieldSchema tableFieldSchema : tableSchema.getFields()) {
-      ddl.append(tableFieldSchema.getName())
-          .append(" ")
-          .append(tableFieldSchema.getType())
-          .append(",");
-    }
-
-    String primaryKeyString = String.join(",", primaryKey);
-    ddl.append(" PRIMARY KEY ")
-        .append("(")
-        .append(primaryKeyString)
-        .append(")")
-        .append(" NOT ENFORCED) ");
-    ddl.append("CLUSTER BY ").append(primaryKeyString);
-
-    BQ_CLIENT.queryWithRetriesUsingStandardSql(ddl.toString(), PROJECT);
-
-    return PROJECT + "." + BIG_QUERY_DATASET_ID + "." + table;
+  private static String getTablespec() {
+    return PROJECT + "." + BIG_QUERY_DATASET_ID + "." + "table" + 
System.nanoTime();
   }
 
   @Test
@@ -130,7 +101,8 @@ public class StorageApiSinkRowUpdateIT {
                 new TableRow().set("key1", "foo4").set("key2", 
"bar4").set("value", "1"),
                 
RowMutationInformation.of(RowMutationInformation.MutationType.DELETE, 1)));
 
-    String tableSpec = createTable(tableSchema, Lists.newArrayList("key1", 
"key2"));
+    List<String> primaryKey = Lists.newArrayList("key1", "key2");
+    String tableSpec = getTablespec();
     Pipeline p = Pipeline.create();
     p.apply("Create rows", Create.of(items))
         .apply(
@@ -138,8 +110,10 @@ public class StorageApiSinkRowUpdateIT {
             BigQueryIO.applyRowMutations()
                 .to(tableSpec)
                 .withSchema(tableSchema)
+                .withPrimaryKey(primaryKey)
+                .withClustering(new Clustering().setFields(primaryKey))
                 .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
-                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
+                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
 
     p.run();
 

Reply via email to