This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new bc39244 [BEAM-13689] Output token elements when BQ batch writes complete. new acd1d52 Merge pull request #16561 from [BEAM-13689] Output TableDestination elements when BQ batch writes complete. bc39244 is described below commit bc39244aae33b1558e5388082d196f3a7d441921 Author: Steve Niemitz <steveniem...@gmail.com> AuthorDate: Mon Feb 1 09:06:08 2021 -0500 [BEAM-13689] Output token elements when BQ batch writes complete. --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 165 ++++++++++++--------- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 2 +- .../sdk/io/gcp/bigquery/StreamingWriteTables.java | 4 +- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 22 ++- .../beam/sdk/io/gcp/bigquery/WriteResult.java | 71 +++++++-- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 94 +++++++++++- 6 files changed, 262 insertions(+), 96 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 6297223..98310eb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.Keys; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -356,30 +357,41 @@ class BatchLoads<DestinationT, ElementT> PCollection<KV<TableDestination, WriteTables.Result>> tempTables = writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView); - tempTables - // Now that the load job has happened, we want the rename to happen immediately. - .apply( - "Window Into Global Windows", - Window.<KV<TableDestination, WriteTables.Result>>into(new GlobalWindows()) - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) - .apply("Add Void Key", WithKeys.of((Void) null)) - .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder())) - .apply("GroupByKey", GroupByKey.create()) - .apply("Extract Values", Values.create()) - .apply( - "WriteRenameTriggered", - ParDo.of( - new WriteRename( - bigQueryServices, - copyJobIdPrefixView, - writeDisposition, - createDisposition, - maxRetryJobs, - kmsKey, - loadJobProjectId)) - .withSideInputs(copyJobIdPrefixView)); - writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView); - return writeResult(p); + PCollection<TableDestination> successfulMultiPartitionWrites = + tempTables + // Now that the load job has happened, we want the rename to happen immediately. + .apply( + "Window Into Global Windows", + Window.<KV<TableDestination, WriteTables.Result>>into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) + .apply("Add Void Key", WithKeys.of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder())) + .apply("GroupByKey", GroupByKey.create()) + .apply("Extract Values", Values.create()) + .apply( + "WriteRenameTriggered", + ParDo.of( + new WriteRename( + bigQueryServices, + copyJobIdPrefixView, + writeDisposition, + createDisposition, + maxRetryJobs, + kmsKey, + loadJobProjectId)) + .withSideInputs(copyJobIdPrefixView)); + + PCollection<TableDestination> successfulSinglePartitionWrites = + writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView) + .apply( + "RewindowSinglePartitionResults", + Window.<TableDestination>into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))); + + PCollectionList<TableDestination> allSuccessfulWrites = + PCollectionList.of(successfulMultiPartitionWrites).and(successfulSinglePartitionWrites); + + return writeResult(p, allSuccessfulWrites.apply(Flatten.pCollections())); } // Expand the pipeline when the user has not requested periodically-triggered file writes. @@ -428,25 +440,30 @@ class BatchLoads<DestinationT, ElementT> rowWriterFactory)) .withSideInputs(tempFilePrefixView) .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); - PCollection<KV<TableDestination, WriteTables.Result>> tempTables = - writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView); - tempTables - .apply("ReifyRenameInput", new ReifyAsIterable<>()) - .apply( - "WriteRenameUntriggered", - ParDo.of( - new WriteRename( - bigQueryServices, - copyJobIdPrefixView, - writeDisposition, - createDisposition, - maxRetryJobs, - kmsKey, - loadJobProjectId)) - .withSideInputs(copyJobIdPrefixView)); - writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView); - return writeResult(p); + PCollection<TableDestination> successfulSinglePartitionWrites = + writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView); + + PCollection<TableDestination> successfulMultiPartitionWrites = + writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView) + .apply("ReifyRenameInput", new ReifyAsIterable<>()) + .apply( + "WriteRenameUntriggered", + ParDo.of( + new WriteRename( + bigQueryServices, + copyJobIdPrefixView, + writeDisposition, + createDisposition, + maxRetryJobs, + kmsKey, + loadJobProjectId)) + .withSideInputs(copyJobIdPrefixView)); + + PCollectionList<TableDestination> allSuccessfulWrites = + PCollectionList.of(successfulSinglePartitionWrites).and(successfulMultiPartitionWrites); + + return writeResult(p, allSuccessfulWrites.apply(Flatten.pCollections())); } // Generate the base job id string. @@ -710,7 +727,7 @@ class BatchLoads<DestinationT, ElementT> // In the case where the files fit into a single load job, there's no need to write temporary // tables and rename. We can load these files directly into the target BigQuery table. - void writeSinglePartition( + PCollection<TableDestination> writeSinglePartition( PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input, PCollectionView<String> loadJobIdPrefixView) { List<PCollectionView<?>> sideInputs = Lists.newArrayList(loadJobIdPrefixView); @@ -724,36 +741,46 @@ class BatchLoads<DestinationT, ElementT> ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), WritePartition.ResultCoder.INSTANCE); // Write single partition to final table - input - .setCoder(partitionsCoder) - // Reshuffle will distribute this among multiple workers, and also guard against - // reexecution of the WritePartitions step once WriteTables has begun. - .apply("SinglePartitionsReshuffle", Reshuffle.of()) - .apply( - "SinglePartitionWriteTables", - new WriteTables<>( - false, - bigQueryServices, - loadJobIdPrefixView, - writeDisposition, - createDisposition, - sideInputs, - dynamicDestinations, - loadJobProjectId, - maxRetryJobs, - ignoreUnknownValues, - kmsKey, - rowWriterFactory.getSourceFormat(), - useAvroLogicalTypes, - schemaUpdateOptions, - null)) - .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE)); + PCollection<KV<TableDestination, WriteTables.Result>> successfulWrites = + input + .setCoder(partitionsCoder) + // Reshuffle will distribute this among multiple workers, and also guard against + // reexecution of the WritePartitions step once WriteTables has begun. + .apply("SinglePartitionsReshuffle", Reshuffle.of()) + .apply( + "SinglePartitionWriteTables", + new WriteTables<>( + false, + bigQueryServices, + loadJobIdPrefixView, + writeDisposition, + createDisposition, + sideInputs, + dynamicDestinations, + loadJobProjectId, + maxRetryJobs, + ignoreUnknownValues, + kmsKey, + rowWriterFactory.getSourceFormat(), + useAvroLogicalTypes, + schemaUpdateOptions, + null)) + .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE)); + + return successfulWrites.apply(Keys.create()); } - private WriteResult writeResult(Pipeline p) { + private WriteResult writeResult(Pipeline p, PCollection<TableDestination> successfulWrites) { PCollection<TableRow> empty = p.apply("CreateEmptyFailedInserts", Create.empty(TypeDescriptor.of(TableRow.class))); - return WriteResult.in(p, new TupleTag<>("failedInserts"), empty, null); + + return WriteResult.in( + p, + new TupleTag<>("failedInserts"), + empty, + null, + new TupleTag<>("successfulInserts"), + successfulWrites); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index e705088..dbc9133 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -165,6 +165,6 @@ public class StorageApiLoads<DestinationT, ElementT> // large. PCollection<TableRow> empty = p.apply("CreateEmptyFailedInserts", Create.empty(TypeDescriptor.of(TableRow.class))); - return WriteResult.in(p, new TupleTag<>("failedInserts"), empty, null); + return WriteResult.in(p, new TupleTag<>("failedInserts"), empty, null, null, null); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java index 915eb8a..2eed7e4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java @@ -301,7 +301,9 @@ public class StreamingWriteTables<ElementT> input.getPipeline(), failedInsertsTag, failedInserts, - result.get(BatchedStreamingWrite.SUCCESSFUL_ROWS_TAG)); + result.get(BatchedStreamingWrite.SUCCESSFUL_ROWS_TAG), + null, + null); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index b9965fa..354f8ef 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap; @@ -53,7 +54,8 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) -class WriteRename extends DoFn<Iterable<KV<TableDestination, WriteTables.Result>>, Void> { +class WriteRename + extends DoFn<Iterable<KV<TableDestination, WriteTables.Result>>, TableDestination> { private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class); private final BigQueryServices bqServices; @@ -73,14 +75,17 @@ class WriteRename extends DoFn<Iterable<KV<TableDestination, WriteTables.Result> final BigQueryHelpers.PendingJob retryJob; final TableDestination tableDestination; final List<TableReference> tempTables; + final BoundedWindow window; public PendingJobData( BigQueryHelpers.PendingJob retryJob, TableDestination tableDestination, - List<TableReference> tempTables) { + List<TableReference> tempTables, + BoundedWindow window) { this.retryJob = retryJob; this.tableDestination = tableDestination; this.tempTables = tempTables; + this.window = window; } } // All pending copy jobs. @@ -122,7 +127,9 @@ class WriteRename extends DoFn<Iterable<KV<TableDestination, WriteTables.Result> @ProcessElement public void processElement( - @Element Iterable<KV<TableDestination, WriteTables.Result>> element, ProcessContext c) + @Element Iterable<KV<TableDestination, WriteTables.Result>> element, + ProcessContext c, + BoundedWindow window) throws Exception { Multimap<TableDestination, WriteTables.Result> tempTables = ArrayListMultimap.create(); for (KV<TableDestination, WriteTables.Result> entry : element) { @@ -133,7 +140,7 @@ class WriteRename extends DoFn<Iterable<KV<TableDestination, WriteTables.Result> // Process each destination table. // Do not copy if no temp tables are provided. if (!entry.getValue().isEmpty()) { - pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c)); + pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c, window)); } } } @@ -155,6 +162,8 @@ class WriteRename extends DoFn<Iterable<KV<TableDestination, WriteTables.Result> .setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), pendingJob.tableDestination.getTableDescription()); } + c.output( + pendingJob.tableDestination, pendingJob.window.maxTimestamp(), pendingJob.window); removeTemporaryTables(datasetService, pendingJob.tempTables); return null; } catch (IOException | InterruptedException e) { @@ -175,7 +184,8 @@ class WriteRename extends DoFn<Iterable<KV<TableDestination, WriteTables.Result> private PendingJobData startWriteRename( TableDestination finalTableDestination, Iterable<WriteTables.Result> tempTableNames, - ProcessContext c) + ProcessContext c, + BoundedWindow window) throws Exception { // The pane may have advanced either here due to triggering or due to an upstream trigger. We // check the upstream @@ -211,7 +221,7 @@ class WriteRename extends DoFn<Iterable<KV<TableDestination, WriteTables.Result> createDisposition, kmsKey, loadJobProjectId); - return new PendingJobData(retryJob, finalTableDestination, tempTables); + return new PendingJobData(retryJob, finalTableDestination, tempTables, window); } private BigQueryHelpers.PendingJob startCopy( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java index 85ef275..b5f2571 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java @@ -42,15 +42,26 @@ public final class WriteResult implements POutput { private final TupleTag<BigQueryInsertError> failedInsertsWithErrTag; private final PCollection<BigQueryInsertError> failedInsertsWithErr; private final PCollection<TableRow> successfulInserts; + private final TupleTag<TableDestination> successfulBatchInsertsTag; + private final PCollection<TableDestination> successfulBatchInserts; /** Creates a {@link WriteResult} in the given {@link Pipeline}. */ static WriteResult in( Pipeline pipeline, TupleTag<TableRow> failedInsertsTag, PCollection<TableRow> failedInserts, - @Nullable PCollection<TableRow> successfulInserts) { + @Nullable PCollection<TableRow> successfulInserts, + @Nullable TupleTag<TableDestination> successfulBatchInsertsTag, + @Nullable PCollection<TableDestination> successfulBatchInserts) { return new WriteResult( - pipeline, failedInsertsTag, failedInserts, null, null, successfulInserts); + pipeline, + failedInsertsTag, + failedInserts, + null, + null, + successfulInserts, + successfulBatchInsertsTag, + successfulBatchInserts); } static WriteResult withExtendedErrors( @@ -59,16 +70,24 @@ public final class WriteResult implements POutput { PCollection<BigQueryInsertError> failedInserts, PCollection<TableRow> successfulInserts) { return new WriteResult( - pipeline, null, null, failedInsertsTag, failedInserts, successfulInserts); + pipeline, null, null, failedInsertsTag, failedInserts, successfulInserts, null, null); } @Override public Map<TupleTag<?>, PValue> expand() { + ImmutableMap.Builder<TupleTag<?>, PValue> output = ImmutableMap.builder(); + if (failedInsertsTag != null) { - return ImmutableMap.of(failedInsertsTag, failedInserts); + output.put(failedInsertsTag, failedInserts); } else { - return ImmutableMap.of(failedInsertsWithErrTag, failedInsertsWithErr); + output.put(failedInsertsWithErrTag, failedInsertsWithErr); + } + + if (successfulBatchInserts != null) { + output.put(successfulBatchInsertsTag, successfulBatchInserts); } + + return output.build(); } private WriteResult( @@ -77,17 +96,45 @@ public final class WriteResult implements POutput { PCollection<TableRow> failedInserts, TupleTag<BigQueryInsertError> failedInsertsWithErrTag, PCollection<BigQueryInsertError> failedInsertsWithErr, - PCollection<TableRow> successfulInserts) { + PCollection<TableRow> successfulInserts, + TupleTag<TableDestination> successfulInsertsTag, + PCollection<TableDestination> successfulBatchInserts) { this.pipeline = pipeline; this.failedInsertsTag = failedInsertsTag; this.failedInserts = failedInserts; this.failedInsertsWithErrTag = failedInsertsWithErrTag; this.failedInsertsWithErr = failedInsertsWithErr; this.successfulInserts = successfulInserts; + this.successfulBatchInsertsTag = successfulInsertsTag; + this.successfulBatchInserts = successfulBatchInserts; + } + + /** + * Returns a {@link PCollection} containing the {@link TableDestination}s that were successfully + * loaded using the batch load API. + */ + public PCollection<TableDestination> getSuccessfulTableLoads() { + checkArgument( + successfulBatchInsertsTag != null, + "Cannot use getSuccessfulTableLoads because this WriteResult was not " + + "configured to produce them. Note: only batch loads produce successfulTableLoads."); + + return successfulBatchInserts; } /** - * Returns a {@link PCollection} containing the {@link TableRow}s that didn't made it to BQ. + * Returns a {@link PCollection} containing the {@link TableRow}s that were written to BQ via the + * streaming insert API. + */ + public PCollection<TableRow> getSuccessfulInserts() { + checkArgument( + successfulInserts != null, + "Retrieving successful inserts is only supported for streaming inserts."); + return successfulInserts; + } + + /** + * Returns a {@link PCollection} containing the {@link TableRow}s that didn't make it to BQ. * * <p>Only use this method if you haven't enabled {@link * BigQueryIO.Write#withExtendedErrorInfo()}. Otherwise use {@link @@ -101,19 +148,11 @@ public final class WriteResult implements POutput { return failedInserts; } - /** Returns a {@link PCollection} containing the {@link TableRow}s that were written to BQ. */ - public PCollection<TableRow> getSuccessfulInserts() { - checkArgument( - successfulInserts != null, - "Retrieving successful inserts is only supported for streaming inserts."); - return successfulInserts; - } - /** * Returns a {@link PCollection} containing the {@link BigQueryInsertError}s with detailed error * information. * - * <p>Only use this method if you have enabled {@link BigQueryIO.Write#withExtendedErrorInfo()}. * + * <p>Only use this method if you have enabled {@link BigQueryIO.Write#withExtendedErrorInfo()}. * Otherwise use {@link WriteResult#getFailedInserts()} */ public PCollection<BigQueryInsertError> getFailedInsertsWithErr() { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index aa8951e..df34790 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -99,6 +99,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Distinct; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -408,7 +409,28 @@ public class BigQueryIOWriteTest implements Serializable { if (autoSharding) { write = write.withAutoSharding(); } - users.apply("WriteBigQuery", write); + WriteResult results = users.apply("WriteBigQuery", write); + + if (!useStreaming && !useStorageApi) { + PCollection<TableDestination> successfulBatchInserts = results.getSuccessfulTableLoads(); + TableDestination[] expectedTables = + userList.stream() + .map( + user -> { + Matcher matcher = userPattern.matcher(user); + checkState(matcher.matches()); + String userId = matcher.group(2); + return new TableDestination( + String.format("project-id:dataset-id.userid-%s$20171127", userId), + String.format("table for userid %s", userId)); + }) + .distinct() + .toArray(TableDestination[]::new); + + PAssert.that(successfulBatchInserts.apply(Distinct.create())) + .containsInAnyOrder(expectedTables); + } + p.run(); Map<Long, List<TableRow>> expectedTableRows = Maps.newHashMap(); @@ -945,6 +967,72 @@ public class BigQueryIOWriteTest implements Serializable { } @Test + public void testWriteWithSuccessfulBatchInserts() throws Exception { + if (useStreaming || useStorageApi) { + return; + } + + WriteResult result = + p.apply( + Create.of( + new TableRow().set("name", "a").set("number", 1), + new TableRow().set("name", "b").set("number", 2), + new TableRow().set("name", "c").set("number", 3)) + .withCoder(TableRowJsonCoder.of())) + .apply( + BigQueryIO.writeTableRows() + .to("dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withoutValidation()); + + PAssert.that(result.getSuccessfulTableLoads()) + .containsInAnyOrder(new TableDestination("project-id:dataset-id.table-id", null)); + + p.run(); + } + + @Test + public void testWriteWithSuccessfulBatchInsertsAndWriteRename() throws Exception { + if (useStreaming || useStorageApi) { + return; + } + + WriteResult result = + p.apply( + Create.of( + new TableRow().set("name", "a").set("number", 1), + new TableRow().set("name", "b").set("number", 2), + new TableRow().set("name", "c").set("number", 3)) + .withCoder(TableRowJsonCoder.of())) + .apply( + BigQueryIO.writeTableRows() + .to("dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withMaxFileSize(1) + .withMaxFilesPerPartition(1) + .withTestServices(fakeBqServices) + .withoutValidation()); + + PAssert.that(result.getSuccessfulTableLoads()) + .containsInAnyOrder(new TableDestination("project-id:dataset-id.table-id", null)); + + p.run(); + } + + @Test public void testWriteWithoutInsertId() throws Exception { if (useStorageApi || !useStreaming) { return; @@ -2088,7 +2176,7 @@ public class BigQueryIOWriteTest implements Serializable { for (int i = 0; i < numFinalTables; ++i) { String tableName = "project-id:dataset-id.table_" + i; TableDestination tableDestination = new TableDestination(tableName, "table_" + i + "_desc"); - for (int j = 0; i < numTempTablesPerFinalTable; ++i) { + for (int j = 0; j < numTempTablesPerFinalTable; ++j) { TableReference tempTable = new TableReference() .setProjectId("project-id") @@ -2122,7 +2210,7 @@ public class BigQueryIOWriteTest implements Serializable { "kms_key", null); - DoFnTester<Iterable<KV<TableDestination, WriteTables.Result>>, Void> tester = + DoFnTester<Iterable<KV<TableDestination, WriteTables.Result>>, TableDestination> tester = DoFnTester.of(writeRename); tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); tester.processElement(tempTablesElement);