This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bc39244  [BEAM-13689] Output token elements when BQ batch writes 
complete.
     new acd1d52  Merge pull request #16561 from [BEAM-13689] Output 
TableDestination elements when BQ batch writes complete.
bc39244 is described below

commit bc39244aae33b1558e5388082d196f3a7d441921
Author: Steve Niemitz <steveniem...@gmail.com>
AuthorDate: Mon Feb 1 09:06:08 2021 -0500

    [BEAM-13689] Output token elements when BQ batch writes complete.
---
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java       | 165 ++++++++++++---------
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |   2 +-
 .../sdk/io/gcp/bigquery/StreamingWriteTables.java  |   4 +-
 .../beam/sdk/io/gcp/bigquery/WriteRename.java      |  22 ++-
 .../beam/sdk/io/gcp/bigquery/WriteResult.java      |  71 +++++++--
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |  94 +++++++++++-
 6 files changed, 262 insertions(+), 96 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 6297223..98310eb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -356,30 +357,41 @@ class BatchLoads<DestinationT, ElementT>
     PCollection<KV<TableDestination, WriteTables.Result>> tempTables =
         writeTempTables(partitions.get(multiPartitionsTag), 
tempLoadJobIdPrefixView);
 
-    tempTables
-        // Now that the load job has happened, we want the rename to happen 
immediately.
-        .apply(
-            "Window Into Global Windows",
-            Window.<KV<TableDestination, WriteTables.Result>>into(new 
GlobalWindows())
-                
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
-        .apply("Add Void Key", WithKeys.of((Void) null))
-        .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
-        .apply("GroupByKey", GroupByKey.create())
-        .apply("Extract Values", Values.create())
-        .apply(
-            "WriteRenameTriggered",
-            ParDo.of(
-                    new WriteRename(
-                        bigQueryServices,
-                        copyJobIdPrefixView,
-                        writeDisposition,
-                        createDisposition,
-                        maxRetryJobs,
-                        kmsKey,
-                        loadJobProjectId))
-                .withSideInputs(copyJobIdPrefixView));
-    writeSinglePartition(partitions.get(singlePartitionTag), 
loadJobIdPrefixView);
-    return writeResult(p);
+    PCollection<TableDestination> successfulMultiPartitionWrites =
+        tempTables
+            // Now that the load job has happened, we want the rename to 
happen immediately.
+            .apply(
+                "Window Into Global Windows",
+                Window.<KV<TableDestination, WriteTables.Result>>into(new 
GlobalWindows())
+                    
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
+            .apply("Add Void Key", WithKeys.of((Void) null))
+            .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
+            .apply("GroupByKey", GroupByKey.create())
+            .apply("Extract Values", Values.create())
+            .apply(
+                "WriteRenameTriggered",
+                ParDo.of(
+                        new WriteRename(
+                            bigQueryServices,
+                            copyJobIdPrefixView,
+                            writeDisposition,
+                            createDisposition,
+                            maxRetryJobs,
+                            kmsKey,
+                            loadJobProjectId))
+                    .withSideInputs(copyJobIdPrefixView));
+
+    PCollection<TableDestination> successfulSinglePartitionWrites =
+        writeSinglePartition(partitions.get(singlePartitionTag), 
loadJobIdPrefixView)
+            .apply(
+                "RewindowSinglePartitionResults",
+                Window.<TableDestination>into(new GlobalWindows())
+                    
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))));
+
+    PCollectionList<TableDestination> allSuccessfulWrites =
+        
PCollectionList.of(successfulMultiPartitionWrites).and(successfulSinglePartitionWrites);
+
+    return writeResult(p, allSuccessfulWrites.apply(Flatten.pCollections()));
   }
 
   // Expand the pipeline when the user has not requested 
periodically-triggered file writes.
@@ -428,25 +440,30 @@ class BatchLoads<DestinationT, ElementT>
                             rowWriterFactory))
                     .withSideInputs(tempFilePrefixView)
                     .withOutputTags(multiPartitionsTag, 
TupleTagList.of(singlePartitionTag)));
-    PCollection<KV<TableDestination, WriteTables.Result>> tempTables =
-        writeTempTables(partitions.get(multiPartitionsTag), 
tempLoadJobIdPrefixView);
 
-    tempTables
-        .apply("ReifyRenameInput", new ReifyAsIterable<>())
-        .apply(
-            "WriteRenameUntriggered",
-            ParDo.of(
-                    new WriteRename(
-                        bigQueryServices,
-                        copyJobIdPrefixView,
-                        writeDisposition,
-                        createDisposition,
-                        maxRetryJobs,
-                        kmsKey,
-                        loadJobProjectId))
-                .withSideInputs(copyJobIdPrefixView));
-    writeSinglePartition(partitions.get(singlePartitionTag), 
loadJobIdPrefixView);
-    return writeResult(p);
+    PCollection<TableDestination> successfulSinglePartitionWrites =
+        writeSinglePartition(partitions.get(singlePartitionTag), 
loadJobIdPrefixView);
+
+    PCollection<TableDestination> successfulMultiPartitionWrites =
+        writeTempTables(partitions.get(multiPartitionsTag), 
tempLoadJobIdPrefixView)
+            .apply("ReifyRenameInput", new ReifyAsIterable<>())
+            .apply(
+                "WriteRenameUntriggered",
+                ParDo.of(
+                        new WriteRename(
+                            bigQueryServices,
+                            copyJobIdPrefixView,
+                            writeDisposition,
+                            createDisposition,
+                            maxRetryJobs,
+                            kmsKey,
+                            loadJobProjectId))
+                    .withSideInputs(copyJobIdPrefixView));
+
+    PCollectionList<TableDestination> allSuccessfulWrites =
+        
PCollectionList.of(successfulSinglePartitionWrites).and(successfulMultiPartitionWrites);
+
+    return writeResult(p, allSuccessfulWrites.apply(Flatten.pCollections()));
   }
 
   // Generate the base job id string.
@@ -710,7 +727,7 @@ class BatchLoads<DestinationT, ElementT>
 
   // In the case where the files fit into a single load job, there's no need 
to write temporary
   // tables and rename. We can load these files directly into the target 
BigQuery table.
-  void writeSinglePartition(
+  PCollection<TableDestination> writeSinglePartition(
       PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input,
       PCollectionView<String> loadJobIdPrefixView) {
     List<PCollectionView<?>> sideInputs = 
Lists.newArrayList(loadJobIdPrefixView);
@@ -724,36 +741,46 @@ class BatchLoads<DestinationT, ElementT>
             ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
             WritePartition.ResultCoder.INSTANCE);
     // Write single partition to final table
-    input
-        .setCoder(partitionsCoder)
-        // Reshuffle will distribute this among multiple workers, and also 
guard against
-        // reexecution of the WritePartitions step once WriteTables has begun.
-        .apply("SinglePartitionsReshuffle", Reshuffle.of())
-        .apply(
-            "SinglePartitionWriteTables",
-            new WriteTables<>(
-                false,
-                bigQueryServices,
-                loadJobIdPrefixView,
-                writeDisposition,
-                createDisposition,
-                sideInputs,
-                dynamicDestinations,
-                loadJobProjectId,
-                maxRetryJobs,
-                ignoreUnknownValues,
-                kmsKey,
-                rowWriterFactory.getSourceFormat(),
-                useAvroLogicalTypes,
-                schemaUpdateOptions,
-                null))
-        .setCoder(KvCoder.of(tableDestinationCoder, 
WriteTables.ResultCoder.INSTANCE));
+    PCollection<KV<TableDestination, WriteTables.Result>> successfulWrites =
+        input
+            .setCoder(partitionsCoder)
+            // Reshuffle will distribute this among multiple workers, and also 
guard against
+            // reexecution of the WritePartitions step once WriteTables has 
begun.
+            .apply("SinglePartitionsReshuffle", Reshuffle.of())
+            .apply(
+                "SinglePartitionWriteTables",
+                new WriteTables<>(
+                    false,
+                    bigQueryServices,
+                    loadJobIdPrefixView,
+                    writeDisposition,
+                    createDisposition,
+                    sideInputs,
+                    dynamicDestinations,
+                    loadJobProjectId,
+                    maxRetryJobs,
+                    ignoreUnknownValues,
+                    kmsKey,
+                    rowWriterFactory.getSourceFormat(),
+                    useAvroLogicalTypes,
+                    schemaUpdateOptions,
+                    null))
+            .setCoder(KvCoder.of(tableDestinationCoder, 
WriteTables.ResultCoder.INSTANCE));
+
+    return successfulWrites.apply(Keys.create());
   }
 
-  private WriteResult writeResult(Pipeline p) {
+  private WriteResult writeResult(Pipeline p, PCollection<TableDestination> 
successfulWrites) {
     PCollection<TableRow> empty =
         p.apply("CreateEmptyFailedInserts", 
Create.empty(TypeDescriptor.of(TableRow.class)));
-    return WriteResult.in(p, new TupleTag<>("failedInserts"), empty, null);
+
+    return WriteResult.in(
+        p,
+        new TupleTag<>("failedInserts"),
+        empty,
+        null,
+        new TupleTag<>("successfulInserts"),
+        successfulWrites);
   }
 
   @Override
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
index e705088..dbc9133 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
@@ -165,6 +165,6 @@ public class StorageApiLoads<DestinationT, ElementT>
     // large.
     PCollection<TableRow> empty =
         p.apply("CreateEmptyFailedInserts", 
Create.empty(TypeDescriptor.of(TableRow.class)));
-    return WriteResult.in(p, new TupleTag<>("failedInserts"), empty, null);
+    return WriteResult.in(p, new TupleTag<>("failedInserts"), empty, null, 
null, null);
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 915eb8a..2eed7e4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -301,7 +301,9 @@ public class StreamingWriteTables<ElementT>
           input.getPipeline(),
           failedInsertsTag,
           failedInserts,
-          result.get(BatchedStreamingWrite.SUCCESSFUL_ROWS_TAG));
+          result.get(BatchedStreamingWrite.SUCCESSFUL_ROWS_TAG),
+          null,
+          null);
     }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index b9965fa..354f8ef 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
@@ -53,7 +54,8 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-class WriteRename extends DoFn<Iterable<KV<TableDestination, 
WriteTables.Result>>, Void> {
+class WriteRename
+    extends DoFn<Iterable<KV<TableDestination, WriteTables.Result>>, 
TableDestination> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class);
 
   private final BigQueryServices bqServices;
@@ -73,14 +75,17 @@ class WriteRename extends 
DoFn<Iterable<KV<TableDestination, WriteTables.Result>
     final BigQueryHelpers.PendingJob retryJob;
     final TableDestination tableDestination;
     final List<TableReference> tempTables;
+    final BoundedWindow window;
 
     public PendingJobData(
         BigQueryHelpers.PendingJob retryJob,
         TableDestination tableDestination,
-        List<TableReference> tempTables) {
+        List<TableReference> tempTables,
+        BoundedWindow window) {
       this.retryJob = retryJob;
       this.tableDestination = tableDestination;
       this.tempTables = tempTables;
+      this.window = window;
     }
   }
   // All pending copy jobs.
@@ -122,7 +127,9 @@ class WriteRename extends 
DoFn<Iterable<KV<TableDestination, WriteTables.Result>
 
   @ProcessElement
   public void processElement(
-      @Element Iterable<KV<TableDestination, WriteTables.Result>> element, 
ProcessContext c)
+      @Element Iterable<KV<TableDestination, WriteTables.Result>> element,
+      ProcessContext c,
+      BoundedWindow window)
       throws Exception {
     Multimap<TableDestination, WriteTables.Result> tempTables = 
ArrayListMultimap.create();
     for (KV<TableDestination, WriteTables.Result> entry : element) {
@@ -133,7 +140,7 @@ class WriteRename extends 
DoFn<Iterable<KV<TableDestination, WriteTables.Result>
       // Process each destination table.
       // Do not copy if no temp tables are provided.
       if (!entry.getValue().isEmpty()) {
-        pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c));
+        pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c, 
window));
       }
     }
   }
@@ -155,6 +162,8 @@ class WriteRename extends 
DoFn<Iterable<KV<TableDestination, WriteTables.Result>
                         
.setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
                     pendingJob.tableDestination.getTableDescription());
               }
+              c.output(
+                  pendingJob.tableDestination, 
pendingJob.window.maxTimestamp(), pendingJob.window);
               removeTemporaryTables(datasetService, pendingJob.tempTables);
               return null;
             } catch (IOException | InterruptedException e) {
@@ -175,7 +184,8 @@ class WriteRename extends 
DoFn<Iterable<KV<TableDestination, WriteTables.Result>
   private PendingJobData startWriteRename(
       TableDestination finalTableDestination,
       Iterable<WriteTables.Result> tempTableNames,
-      ProcessContext c)
+      ProcessContext c,
+      BoundedWindow window)
       throws Exception {
     // The pane may have advanced either here due to triggering or due to an 
upstream trigger. We
     // check the upstream
@@ -211,7 +221,7 @@ class WriteRename extends 
DoFn<Iterable<KV<TableDestination, WriteTables.Result>
             createDisposition,
             kmsKey,
             loadJobProjectId);
-    return new PendingJobData(retryJob, finalTableDestination, tempTables);
+    return new PendingJobData(retryJob, finalTableDestination, tempTables, 
window);
   }
 
   private BigQueryHelpers.PendingJob startCopy(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
index 85ef275..b5f2571 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
@@ -42,15 +42,26 @@ public final class WriteResult implements POutput {
   private final TupleTag<BigQueryInsertError> failedInsertsWithErrTag;
   private final PCollection<BigQueryInsertError> failedInsertsWithErr;
   private final PCollection<TableRow> successfulInserts;
+  private final TupleTag<TableDestination> successfulBatchInsertsTag;
+  private final PCollection<TableDestination> successfulBatchInserts;
 
   /** Creates a {@link WriteResult} in the given {@link Pipeline}. */
   static WriteResult in(
       Pipeline pipeline,
       TupleTag<TableRow> failedInsertsTag,
       PCollection<TableRow> failedInserts,
-      @Nullable PCollection<TableRow> successfulInserts) {
+      @Nullable PCollection<TableRow> successfulInserts,
+      @Nullable TupleTag<TableDestination> successfulBatchInsertsTag,
+      @Nullable PCollection<TableDestination> successfulBatchInserts) {
     return new WriteResult(
-        pipeline, failedInsertsTag, failedInserts, null, null, 
successfulInserts);
+        pipeline,
+        failedInsertsTag,
+        failedInserts,
+        null,
+        null,
+        successfulInserts,
+        successfulBatchInsertsTag,
+        successfulBatchInserts);
   }
 
   static WriteResult withExtendedErrors(
@@ -59,16 +70,24 @@ public final class WriteResult implements POutput {
       PCollection<BigQueryInsertError> failedInserts,
       PCollection<TableRow> successfulInserts) {
     return new WriteResult(
-        pipeline, null, null, failedInsertsTag, failedInserts, 
successfulInserts);
+        pipeline, null, null, failedInsertsTag, failedInserts, 
successfulInserts, null, null);
   }
 
   @Override
   public Map<TupleTag<?>, PValue> expand() {
+    ImmutableMap.Builder<TupleTag<?>, PValue> output = ImmutableMap.builder();
+
     if (failedInsertsTag != null) {
-      return ImmutableMap.of(failedInsertsTag, failedInserts);
+      output.put(failedInsertsTag, failedInserts);
     } else {
-      return ImmutableMap.of(failedInsertsWithErrTag, failedInsertsWithErr);
+      output.put(failedInsertsWithErrTag, failedInsertsWithErr);
+    }
+
+    if (successfulBatchInserts != null) {
+      output.put(successfulBatchInsertsTag, successfulBatchInserts);
     }
+
+    return output.build();
   }
 
   private WriteResult(
@@ -77,17 +96,45 @@ public final class WriteResult implements POutput {
       PCollection<TableRow> failedInserts,
       TupleTag<BigQueryInsertError> failedInsertsWithErrTag,
       PCollection<BigQueryInsertError> failedInsertsWithErr,
-      PCollection<TableRow> successfulInserts) {
+      PCollection<TableRow> successfulInserts,
+      TupleTag<TableDestination> successfulInsertsTag,
+      PCollection<TableDestination> successfulBatchInserts) {
     this.pipeline = pipeline;
     this.failedInsertsTag = failedInsertsTag;
     this.failedInserts = failedInserts;
     this.failedInsertsWithErrTag = failedInsertsWithErrTag;
     this.failedInsertsWithErr = failedInsertsWithErr;
     this.successfulInserts = successfulInserts;
+    this.successfulBatchInsertsTag = successfulInsertsTag;
+    this.successfulBatchInserts = successfulBatchInserts;
+  }
+
+  /**
+   * Returns a {@link PCollection} containing the {@link TableDestination}s 
that were successfully
+   * loaded using the batch load API.
+   */
+  public PCollection<TableDestination> getSuccessfulTableLoads() {
+    checkArgument(
+        successfulBatchInsertsTag != null,
+        "Cannot use getSuccessfulTableLoads because this WriteResult was not "
+            + "configured to produce them.  Note: only batch loads produce 
successfulTableLoads.");
+
+    return successfulBatchInserts;
   }
 
   /**
-   * Returns a {@link PCollection} containing the {@link TableRow}s that 
didn't made it to BQ.
+   * Returns a {@link PCollection} containing the {@link TableRow}s that were 
written to BQ via the
+   * streaming insert API.
+   */
+  public PCollection<TableRow> getSuccessfulInserts() {
+    checkArgument(
+        successfulInserts != null,
+        "Retrieving successful inserts is only supported for streaming 
inserts.");
+    return successfulInserts;
+  }
+
+  /**
+   * Returns a {@link PCollection} containing the {@link TableRow}s that 
didn't make it to BQ.
    *
    * <p>Only use this method if you haven't enabled {@link
    * BigQueryIO.Write#withExtendedErrorInfo()}. Otherwise use {@link
@@ -101,19 +148,11 @@ public final class WriteResult implements POutput {
     return failedInserts;
   }
 
-  /** Returns a {@link PCollection} containing the {@link TableRow}s that were 
written to BQ. */
-  public PCollection<TableRow> getSuccessfulInserts() {
-    checkArgument(
-        successfulInserts != null,
-        "Retrieving successful inserts is only supported for streaming 
inserts.");
-    return successfulInserts;
-  }
-
   /**
    * Returns a {@link PCollection} containing the {@link BigQueryInsertError}s 
with detailed error
    * information.
    *
-   * <p>Only use this method if you have enabled {@link 
BigQueryIO.Write#withExtendedErrorInfo()}. *
+   * <p>Only use this method if you have enabled {@link 
BigQueryIO.Write#withExtendedErrorInfo()}.
    * Otherwise use {@link WriteResult#getFailedInserts()}
    */
   public PCollection<BigQueryInsertError> getFailedInsertsWithErr() {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index aa8951e..df34790 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -99,6 +99,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -408,7 +409,28 @@ public class BigQueryIOWriteTest implements Serializable {
     if (autoSharding) {
       write = write.withAutoSharding();
     }
-    users.apply("WriteBigQuery", write);
+    WriteResult results = users.apply("WriteBigQuery", write);
+
+    if (!useStreaming && !useStorageApi) {
+      PCollection<TableDestination> successfulBatchInserts = 
results.getSuccessfulTableLoads();
+      TableDestination[] expectedTables =
+          userList.stream()
+              .map(
+                  user -> {
+                    Matcher matcher = userPattern.matcher(user);
+                    checkState(matcher.matches());
+                    String userId = matcher.group(2);
+                    return new TableDestination(
+                        
String.format("project-id:dataset-id.userid-%s$20171127", userId),
+                        String.format("table for userid %s", userId));
+                  })
+              .distinct()
+              .toArray(TableDestination[]::new);
+
+      PAssert.that(successfulBatchInserts.apply(Distinct.create()))
+          .containsInAnyOrder(expectedTables);
+    }
+
     p.run();
 
     Map<Long, List<TableRow>> expectedTableRows = Maps.newHashMap();
@@ -945,6 +967,72 @@ public class BigQueryIOWriteTest implements Serializable {
   }
 
   @Test
+  public void testWriteWithSuccessfulBatchInserts() throws Exception {
+    if (useStreaming || useStorageApi) {
+      return;
+    }
+
+    WriteResult result =
+        p.apply(
+                Create.of(
+                        new TableRow().set("name", "a").set("number", 1),
+                        new TableRow().set("name", "b").set("number", 2),
+                        new TableRow().set("name", "c").set("number", 3))
+                    .withCoder(TableRowJsonCoder.of()))
+            .apply(
+                BigQueryIO.writeTableRows()
+                    .to("dataset-id.table-id")
+                    
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                    .withSchema(
+                        new TableSchema()
+                            .setFields(
+                                ImmutableList.of(
+                                    new 
TableFieldSchema().setName("name").setType("STRING"),
+                                    new 
TableFieldSchema().setName("number").setType("INTEGER"))))
+                    .withTestServices(fakeBqServices)
+                    .withoutValidation());
+
+    PAssert.that(result.getSuccessfulTableLoads())
+        .containsInAnyOrder(new 
TableDestination("project-id:dataset-id.table-id", null));
+
+    p.run();
+  }
+
+  @Test
+  public void testWriteWithSuccessfulBatchInsertsAndWriteRename() throws 
Exception {
+    if (useStreaming || useStorageApi) {
+      return;
+    }
+
+    WriteResult result =
+        p.apply(
+                Create.of(
+                        new TableRow().set("name", "a").set("number", 1),
+                        new TableRow().set("name", "b").set("number", 2),
+                        new TableRow().set("name", "c").set("number", 3))
+                    .withCoder(TableRowJsonCoder.of()))
+            .apply(
+                BigQueryIO.writeTableRows()
+                    .to("dataset-id.table-id")
+                    
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                    .withSchema(
+                        new TableSchema()
+                            .setFields(
+                                ImmutableList.of(
+                                    new 
TableFieldSchema().setName("name").setType("STRING"),
+                                    new 
TableFieldSchema().setName("number").setType("INTEGER"))))
+                    .withMaxFileSize(1)
+                    .withMaxFilesPerPartition(1)
+                    .withTestServices(fakeBqServices)
+                    .withoutValidation());
+
+    PAssert.that(result.getSuccessfulTableLoads())
+        .containsInAnyOrder(new 
TableDestination("project-id:dataset-id.table-id", null));
+
+    p.run();
+  }
+
+  @Test
   public void testWriteWithoutInsertId() throws Exception {
     if (useStorageApi || !useStreaming) {
       return;
@@ -2088,7 +2176,7 @@ public class BigQueryIOWriteTest implements Serializable {
     for (int i = 0; i < numFinalTables; ++i) {
       String tableName = "project-id:dataset-id.table_" + i;
       TableDestination tableDestination = new TableDestination(tableName, 
"table_" + i + "_desc");
-      for (int j = 0; i < numTempTablesPerFinalTable; ++i) {
+      for (int j = 0; j < numTempTablesPerFinalTable; ++j) {
         TableReference tempTable =
             new TableReference()
                 .setProjectId("project-id")
@@ -2122,7 +2210,7 @@ public class BigQueryIOWriteTest implements Serializable {
             "kms_key",
             null);
 
-    DoFnTester<Iterable<KV<TableDestination, WriteTables.Result>>, Void> 
tester =
+    DoFnTester<Iterable<KV<TableDestination, WriteTables.Result>>, 
TableDestination> tester =
         DoFnTester.of(writeRename);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
     tester.processElement(tempTablesElement);

Reply via email to