Repository: beam Updated Branches: refs/heads/master 0f8e8dd70 -> f7e8f886c
Allow users to choose the BigQuery insertion method. If choosing file load jobs on an unbounded PCollection, a triggering frequency must be specified to control how often load jobs are generated. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/075d4d45 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/075d4d45 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/075d4d45 Branch: refs/heads/master Commit: 075d4d45a9cd398f3b4023b6efd495cc58eb9bdd Parents: 0f8e8dd Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com> Authored: Sun Jul 30 11:17:39 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Mon Aug 14 14:32:10 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 447 +++++++++++++------ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 168 ++++++- .../sdk/io/gcp/bigquery/ReifyAsIterable.java | 51 +++ .../io/gcp/bigquery/WriteBundlesToFiles.java | 15 +- .../sdk/io/gcp/bigquery/WritePartition.java | 13 +- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 111 +++-- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 38 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 269 ++++++----- 8 files changed, 770 insertions(+), 342 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index e46b1d3..0a1306d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import com.google.api.services.bigquery.model.TableRow; @@ -26,9 +27,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Lists; import java.util.List; -import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.NullableCoder; @@ -48,9 +50,15 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; @@ -62,6 +70,7 @@ import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +102,12 @@ class BatchLoads<DestinationT> // The maximum size of a single file - 4TiB, just under the 5 TiB limit. static final long DEFAULT_MAX_FILE_SIZE = 4 * (1L << 40); + static final int DEFAULT_NUM_FILE_SHARDS = 0; + + // If user triggering is supplied, we will trigger the file write after this many records are + // written. + static final int FILE_TRIGGERING_RECORD_COUNT = 500000; + // The maximum number of retries to poll the status of a job. // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; @@ -110,6 +125,8 @@ class BatchLoads<DestinationT> private final Coder<DestinationT> destinationCoder; private int maxNumWritersPerBundle; private long maxFileSize; + private int numFileShards; + private Duration triggeringFrequency; BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition, boolean singletonTable, @@ -123,6 +140,8 @@ class BatchLoads<DestinationT> this.destinationCoder = destinationCoder; this.maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE; this.maxFileSize = DEFAULT_MAX_FILE_SIZE; + this.numFileShards = DEFAULT_NUM_FILE_SHARDS; + this.triggeringFrequency = null; } void setTestServices(BigQueryServices bigQueryServices) { @@ -139,6 +158,14 @@ class BatchLoads<DestinationT> this.maxNumWritersPerBundle = maxNumWritersPerBundle; } + public void setTriggeringFrequency(Duration triggeringFrequency) { + this.triggeringFrequency = triggeringFrequency; + } + + public void setNumFileShards(int numFileShards) { + this.numFileShards = numFileShards; + } + @VisibleForTesting void setMaxFileSize(long maxFileSize) { this.maxFileSize = maxFileSize; @@ -164,171 +191,323 @@ class BatchLoads<DestinationT> } } - @Override - public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) { + // Expand the pipeline when the user has requested periodically-triggered file writes. + private WriteResult expandTriggered(PCollection<KV<DestinationT, TableRow>> input) { + checkArgument(numFileShards > 0); Pipeline p = input.getPipeline(); + final PCollectionView<String> jobIdTokenView = createJobIdView(p); + final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView); + // The user-supplied triggeringDuration is often chosen to to control how many BigQuery load + // jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this + // is set to a large value, currently we have to buffer all the data unti the trigger fires. + // Instead we ensure that the files are written if a threshold number of records are ready. + // We use only the user-supplied trigger on the actual BigQuery load. This allows us to + // offload the data to the filesystem. + PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow = + input.apply( + "rewindowIntoGlobal", + Window.<KV<DestinationT, TableRow>>into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(triggeringFrequency), + AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT)))) + .discardingFiredPanes()); + PCollection<WriteBundlesToFiles.Result<DestinationT>> results = + writeShardedFiles(inputInGlobalWindow, tempFilePrefixView); + + // Apply the user's trigger before we start generating BigQuery load jobs. + results = + results.apply( + "applyUserTrigger", + Window.<WriteBundlesToFiles.Result<DestinationT>>into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(triggeringFrequency))) + .discardingFiredPanes()); - // Create a singleton job ID token at execution time. This will be used as the base for all - // load jobs issued from this instance of the transform. - final PCollection<String> jobIdToken = - p.apply("TriggerIdCreation", Create.of("ignored")) - .apply( - "CreateJobId", - MapElements.via( - new SimpleFunction<String, String>() { - @Override - public String apply(String input) { - return BigQueryHelpers.randomUUIDString(); - } - })); - final PCollectionView<String> jobIdTokenView = jobIdToken.apply(View.<String>asSingleton()); + TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag = + new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag"); + TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag = + new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag"); - PCollectionView<String> tempFilePrefix = jobIdToken + // If we have non-default triggered output, we can't use the side-input technique used in + // expandUntriggered . Instead make the result list a main input. Apply a GroupByKey first for + // determinism. + PCollectionTuple partitions = + results + .apply( + "AttachSingletonKey", + WithKeys.<Void, WriteBundlesToFiles.Result<DestinationT>>of((Void) null)) + .setCoder( + KvCoder.of(VoidCoder.of(), WriteBundlesToFiles.ResultCoder.of(destinationCoder))) + .apply("GroupOntoSingleton", GroupByKey.<Void, Result<DestinationT>>create()) + .apply("ExtractResultValues", Values.<Iterable<Result<DestinationT>>>create()) .apply( - "GetTempFilePrefix", + "WritePartitionTriggered", ParDo.of( - new DoFn<String, String>() { - @ProcessElement - public void getTempFilePrefix(ProcessContext c) { - String tempLocation = resolveTempLocation( - c.getPipelineOptions().getTempLocation(), - "BigQueryWriteTemp", c.element()); - LOG.info("Writing BigQuery temporary files to {} before loading them.", - tempLocation); - c.output(tempLocation); - } - })) - .apply("TempFilePrefixView", View.<String>asSingleton()); + new WritePartition<>( + singletonTable, + dynamicDestinations, + tempFilePrefixView, + multiPartitionsTag, + singlePartitionTag)) + .withSideInputs(tempFilePrefixView) + .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); + PCollection<KV<TableDestination, String>> tempTables = + writeTempTables(partitions.get(multiPartitionsTag), jobIdTokenView); + tempTables + // Now that the load job has happened, we want the rename to happen immediately. + .apply( + Window.<KV<TableDestination, String>>into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) + .apply(WithKeys.<Void, KV<TableDestination, String>>of((Void) null)) + .setCoder( + KvCoder.of( + VoidCoder.of(), KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of()))) + .apply(GroupByKey.<Void, KV<TableDestination, String>>create()) + .apply(Values.<Iterable<KV<TableDestination, String>>>create()) + .apply( + "WriteRenameTriggered", + ParDo.of( + new WriteRename( + bigQueryServices, jobIdTokenView, writeDisposition, createDisposition)) + .withSideInputs(jobIdTokenView)); + writeSinglePartition(partitions.get(singlePartitionTag), jobIdTokenView); + return writeResult(p); + } + // Expand the pipeline when the user has not requested periodically-triggered file writes. + public WriteResult expandUntriggered(PCollection<KV<DestinationT, TableRow>> input) { + Pipeline p = input.getPipeline(); + final PCollectionView<String> jobIdTokenView = createJobIdView(p); + final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView); PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow = input.apply( "rewindowIntoGlobal", Window.<KV<DestinationT, TableRow>>into(new GlobalWindows()) .triggering(DefaultTrigger.of()) .discardingFiredPanes()); - PCollectionView<Map<DestinationT, String>> schemasView = - inputInGlobalWindow.apply(new CalculateSchemas<>(dynamicDestinations)); + PCollection<WriteBundlesToFiles.Result<DestinationT>> results = + (numFileShards == 0) + ? writeDynamicallyShardedFiles(inputInGlobalWindow, tempFilePrefixView) + : writeShardedFiles(inputInGlobalWindow, tempFilePrefixView); + + TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag = + new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag = + new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {}; + + // This transform will look at the set of files written for each table, and if any table has + // too many files or bytes, will partition that table's files into multiple partitions for + // loading. + PCollectionTuple partitions = + results + .apply("ReifyResults", new ReifyAsIterable<WriteBundlesToFiles.Result<DestinationT>>()) + .setCoder(IterableCoder.of(WriteBundlesToFiles.ResultCoder.of(destinationCoder))) + .apply( + "WritePartitionUntriggered", + ParDo.of( + new WritePartition<>( + singletonTable, + dynamicDestinations, + tempFilePrefixView, + multiPartitionsTag, + singlePartitionTag)) + .withSideInputs(tempFilePrefixView) + .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); + PCollection<KV<TableDestination, String>> tempTables = + writeTempTables(partitions.get(multiPartitionsTag), jobIdTokenView); + + tempTables + .apply("ReifyRenameInput", new ReifyAsIterable<KV<TableDestination, String>>()) + .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of()))) + .apply( + "WriteRenameUntriggered", + ParDo.of( + new WriteRename( + bigQueryServices, jobIdTokenView, writeDisposition, createDisposition)) + .withSideInputs(jobIdTokenView)); + writeSinglePartition(partitions.get(singlePartitionTag), jobIdTokenView); + return writeResult(p); + } + // Generate the base job id string. + private PCollectionView<String> createJobIdView(Pipeline p) { + // Create a singleton job ID token at execution time. This will be used as the base for all + // load jobs issued from this instance of the transform. + return p.apply("JobIdCreationRoot", Create.of((Void) null)) + .apply( + "CreateJobId", + MapElements.via( + new SimpleFunction<Void, String>() { + @Override + public String apply(Void input) { + return BigQueryHelpers.randomUUIDString(); + } + })) + .apply(View.<String>asSingleton()); + } + + // Generate the temporary-file prefix. + private PCollectionView<String> createTempFilePrefixView(PCollectionView<String> jobIdView) { + return ((PCollection<String>) jobIdView.getPCollection()) + .apply( + "GetTempFilePrefix", + ParDo.of( + new DoFn<String, String>() { + @ProcessElement + public void getTempFilePrefix(ProcessContext c) { + String tempLocation = + resolveTempLocation( + c.getPipelineOptions().getTempLocation(), + "BigQueryWriteTemp", + c.element()); + LOG.info( + "Writing BigQuery temporary files to {} before loading them.", + tempLocation); + c.output(tempLocation); + } + })) + .apply("TempFilePrefixView", View.<String>asSingleton()); + } + + // Writes input data to dynamically-sharded, per-bundle files. Returns a PCollection of filename, + // file byte size, and table destination. + PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFiles( + PCollection<KV<DestinationT, TableRow>> input, PCollectionView<String> tempFilePrefix) { TupleTag<WriteBundlesToFiles.Result<DestinationT>> writtenFilesTag = - new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles"){}; + new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles") {}; TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag = new TupleTag<KV<ShardedKey<DestinationT>, TableRow>>("unwrittenRecords") {}; - PCollectionTuple writeBundlesTuple = inputInGlobalWindow - .apply("WriteBundlesToFiles", - ParDo.of(new WriteBundlesToFiles<>(tempFilePrefix, unwrittedRecordsTag, - maxNumWritersPerBundle, maxFileSize)) - .withSideInputs(tempFilePrefix) - .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag))); + PCollectionTuple writeBundlesTuple = + input.apply( + "WriteBundlesToFiles", + ParDo.of( + new WriteBundlesToFiles<>( + tempFilePrefix, unwrittedRecordsTag, maxNumWritersPerBundle, maxFileSize)) + .withSideInputs(tempFilePrefix) + .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag))); PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles = - writeBundlesTuple.get(writtenFilesTag) - .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); + writeBundlesTuple + .get(writtenFilesTag) + .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); + PCollection<KV<ShardedKey<DestinationT>, TableRow>> unwrittenRecords = + writeBundlesTuple + .get(unwrittedRecordsTag) + .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), TableRowJsonCoder.of())); // If the bundles contain too many output tables to be written inline to files (due to memory // limits), any unwritten records will be spilled to the unwrittenRecordsTag PCollection. // Group these records by key, and write the files after grouping. Since the record is grouped // by key, we can ensure that only one file is open at a time in each bundle. PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFilesGrouped = - writeBundlesTuple - .get(unwrittedRecordsTag) - .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), TableRowJsonCoder.of())) - .apply(GroupByKey.<ShardedKey<DestinationT>, TableRow>create()) - .apply( - ParDo.of(new WriteGroupedRecordsToFiles<DestinationT>(tempFilePrefix, maxFileSize)) - .withSideInputs(tempFilePrefix)) - .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); + writeShardedRecords(unwrittenRecords, tempFilePrefix); // PCollection of filename, file byte size, and table destination. - PCollection<WriteBundlesToFiles.Result<DestinationT>> results = - PCollectionList.of(writtenFiles).and(writtenFilesGrouped) - .apply(Flatten.<Result<DestinationT>>pCollections()); + return PCollectionList.of(writtenFiles) + .and(writtenFilesGrouped) + .apply("FlattenFiles", Flatten.<Result<DestinationT>>pCollections()) + .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); + } - TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag = - new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {}; - TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag = - new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {}; + // Writes input data to statically-sharded files. Returns a PCollection of filename, + // file byte size, and table destination. + PCollection<WriteBundlesToFiles.Result<DestinationT>> writeShardedFiles( + PCollection<KV<DestinationT, TableRow>> input, PCollectionView<String> tempFilePrefix) { + checkState(numFileShards > 0); + PCollection<KV<ShardedKey<DestinationT>, TableRow>> shardedRecords = + input + .apply( + "AddShard", + ParDo.of( + new DoFn<KV<DestinationT, TableRow>, KV<ShardedKey<DestinationT>, TableRow>>() { + int shardNumber; - // Turn the list of files and record counts in a PCollectionView that can be used as a - // side input. - PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView = - results.apply("ResultsView", - View.<WriteBundlesToFiles.Result<DestinationT>>asIterable()); - // This transform will look at the set of files written for each table, and if any table has - // too many files or bytes, will partition that table's files into multiple partitions for - // loading. - PCollection<Void> singleton = p.apply("singleton", - Create.of((Void) null).withCoder(VoidCoder.of())); - PCollectionTuple partitions = - singleton.apply( - "WritePartition", - ParDo.of( - new WritePartition<>( - singletonTable, - dynamicDestinations, - tempFilePrefix, - resultsView, - multiPartitionsTag, - singlePartitionTag)) - .withSideInputs(tempFilePrefix, resultsView) - .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); - - List<PCollectionView<?>> writeTablesSideInputs = - Lists.newArrayList(jobIdTokenView, schemasView); - writeTablesSideInputs.addAll(dynamicDestinations.getSideInputs()); + @Setup + public void setup() { + shardNumber = ThreadLocalRandom.current().nextInt(numFileShards); + } + + @ProcessElement + public void processElement(ProcessContext c) { + DestinationT destination = c.element().getKey(); + TableRow tableRow = c.element().getValue(); + c.output( + KV.of( + ShardedKey.of(destination, ++shardNumber % numFileShards), + tableRow)); + } + })) + .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), TableRowJsonCoder.of())); + + return writeShardedRecords(shardedRecords, tempFilePrefix); + } + + private PCollection<Result<DestinationT>> writeShardedRecords( + PCollection<KV<ShardedKey<DestinationT>, TableRow>> shardedRecords, + PCollectionView<String> tempFilePrefix) { + return shardedRecords + .apply("GroupByDestination", GroupByKey.<ShardedKey<DestinationT>, TableRow>create()) + .apply( + "WriteGroupedRecords", + ParDo.of(new WriteGroupedRecordsToFiles<DestinationT>(tempFilePrefix, maxFileSize)) + .withSideInputs(tempFilePrefix)) + .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); + } + + // Take in a list of files and write them to temporary tables. + private PCollection<KV<TableDestination, String>> writeTempTables( + PCollection<KV<ShardedKey<DestinationT>, List<String>>> input, + PCollectionView<String> jobIdTokenView) { + List<PCollectionView<?>> sideInputs = Lists.<PCollectionView<?>>newArrayList(jobIdTokenView); + sideInputs.addAll(dynamicDestinations.getSideInputs()); Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder = - KvCoder.of( - ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), - ListCoder.of(StringUtf8Coder.of())); + KvCoder.of( + ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), + ListCoder.of(StringUtf8Coder.of())); // If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then // the import needs to be split into multiple partitions, and those partitions will be // specified in multiPartitionsTag. - PCollection<KV<TableDestination, String>> tempTables = - partitions - .get(multiPartitionsTag) - .setCoder(partitionsCoder) - // Reshuffle will distribute this among multiple workers, and also guard against - // reexecution of the WritePartitions step once WriteTables has begun. - .apply( - "MultiPartitionsReshuffle", - Reshuffle.<ShardedKey<DestinationT>, List<String>>of()) - .apply( - "MultiPartitionsWriteTables", - ParDo.of( - new WriteTables<>( - false, - bigQueryServices, - jobIdTokenView, - schemasView, - WriteDisposition.WRITE_EMPTY, - CreateDisposition.CREATE_IF_NEEDED, - dynamicDestinations)) - .withSideInputs(writeTablesSideInputs)); - - // This view maps each final table destination to the set of temporary partitioned tables - // the PCollection was loaded into. - PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = - tempTables.apply("TempTablesView", View.<TableDestination, String>asMultimap()); - - singleton.apply( - "WriteRename", - ParDo.of( - new WriteRename( - bigQueryServices, - jobIdTokenView, - writeDisposition, - createDisposition, - tempTablesView)) - .withSideInputs(tempTablesView, jobIdTokenView)); + return input + .setCoder(partitionsCoder) + // Reshuffle will distribute this among multiple workers, and also guard against + // reexecution of the WritePartitions step once WriteTables has begun. + .apply("MultiPartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of()) + .apply( + "MultiPartitionsWriteTables", + ParDo.of( + new WriteTables<>( + false, + bigQueryServices, + jobIdTokenView, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED, + dynamicDestinations)) + .withSideInputs(sideInputs)); + } + // In the case where the files fit into a single load job, there's no need to write temporary + // tables and rename. We can load these files directly into the target BigQuery table. + void writeSinglePartition( + PCollection<KV<ShardedKey<DestinationT>, List<String>>> input, + PCollectionView<String> jobIdTokenView) { + List<PCollectionView<?>> sideInputs = Lists.<PCollectionView<?>>newArrayList(jobIdTokenView); + sideInputs.addAll(dynamicDestinations.getSideInputs()); + Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder = + KvCoder.of( + ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), + ListCoder.of(StringUtf8Coder.of())); // Write single partition to final table - partitions - .get(singlePartitionTag) + input .setCoder(partitionsCoder) // Reshuffle will distribute this among multiple workers, and also guard against // reexecution of the WritePartitions step once WriteTables has begun. - .apply( - "SinglePartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of()) + .apply("SinglePartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of()) .apply( "SinglePartitionWriteTables", ParDo.of( @@ -336,14 +515,20 @@ class BatchLoads<DestinationT> true, bigQueryServices, jobIdTokenView, - schemasView, writeDisposition, createDisposition, dynamicDestinations)) - .withSideInputs(writeTablesSideInputs)); + .withSideInputs(sideInputs)); + } + private WriteResult writeResult(Pipeline p) { PCollection<TableRow> empty = p.apply("CreateEmptyFailedInserts", Create.empty(TypeDescriptor.of(TableRow.class))); - return WriteResult.in(input.getPipeline(), new TupleTag<TableRow>("failedInserts"), empty); + return WriteResult.in(p, new TupleTag<TableRow>("failedInserts"), empty); + } + + @Override + public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) { + return (triggeringFrequency != null) ? expandTriggered(input) : expandUntriggered(input); } } http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 6edbd06..feb085d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -92,19 +92,20 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * {@link PTransform}s for reading and writing - * <a href="https://developers.google.com/bigquery/">BigQuery</a> tables. + * {@link PTransform}s for reading and writing <a + * href="https://developers.google.com/bigquery/">BigQuery</a> tables. * * <h3>Table References</h3> * * <p>A fully-qualified BigQuery table name consists of three components: + * * <ul> - * <li>{@code projectId}: the Cloud project id (defaults to - * {@link GcpOptions#getProject()}). + * <li>{@code projectId}: the Cloud project id (defaults to {@link GcpOptions#getProject()}). * <li>{@code datasetId}: the BigQuery dataset id, unique within a project. * <li>{@code tableId}: a table id, unique within a dataset. * </ul> @@ -122,8 +123,8 @@ import org.slf4j.LoggerFactory; * * <h3>Reading</h3> * - * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation. - * This produces a {@link PCollection} of {@link TableRow TableRows} as output: + * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation. This produces a + * {@link PCollection} of {@link TableRow TableRows} as output: * * <pre>{@code * PCollection<TableRow> weatherData = pipeline.apply( @@ -146,12 +147,11 @@ import org.slf4j.LoggerFactory; * * <h3>Writing</h3> * - * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation. - * This consumes either a {@link PCollection} of {@link TableRow TableRows} as input when using - * {@link BigQueryIO#writeTableRows()} or of a user-defined type when using - * {@link BigQueryIO#write()}. When using a user-defined type, a function must be provided to - * turn this type into a {@link TableRow} using - * {@link BigQueryIO.Write#withFormatFunction(SerializableFunction)}. + * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation. This consumes + * either a {@link PCollection} of {@link TableRow TableRows} as input when using {@link + * BigQueryIO#writeTableRows()} or of a user-defined type when using {@link BigQueryIO#write()}. + * When using a user-defined type, a function must be provided to turn this type into a {@link + * TableRow} using {@link BigQueryIO.Write#withFormatFunction(SerializableFunction)}. * * <pre>{@code * PCollection<TableRow> quotes = ... @@ -216,8 +216,8 @@ import org.slf4j.LoggerFactory; * can also be useful when writing to a single table, as it allows a previous stage to calculate the * schema (possibly based on the full collection of records being written to BigQuery). * - * <p>For the most general form of dynamic table destinations and schemas, look at - * {@link BigQueryIO.Write#to(DynamicDestinations)}. + * <p>For the most general form of dynamic table destinations and schemas, look at {@link + * BigQueryIO.Write#to(DynamicDestinations)}. * * <h3>Permissions</h3> * @@ -227,6 +227,15 @@ import org.slf4j.LoggerFactory; * * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control * </a> for security and permission related information specific to BigQuery. + * + * <h3>Insertion Method</h3> + * + * {@link BigQueryIO.Write} supports two methods of inserting data into BigQuery specified using + * {@link BigQueryIO.Write#withMethod}. If no method is supplied, then a default method will be + * chosen based on the input PCollection. See {@link BigQueryIO.Write.Method} for more information + * about the methods. The different insertion methods provide different tradeoffs of cost, quota, + * and data consistency; please see BigQuery documentation for more information about these + * tradeoffs. */ public class BigQueryIO { private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class); @@ -757,6 +766,8 @@ public class BigQueryIO { .setBigQueryServices(new BigQueryServicesImpl()) .setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED) .setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY) + .setNumFileShards(0) + .setMethod(Write.Method.DEFAULT) .build(); } @@ -771,6 +782,41 @@ public class BigQueryIO { /** Implementation of {@link #write}. */ @AutoValue public abstract static class Write<T> extends PTransform<PCollection<T>, WriteResult> { + /** Determines the method used to insert data in BigQuery. */ + public enum Method { + /** + * The default behavior if no method is explicitly set. If the input is bounded, then file + * loads will be used. If the input is unbounded, then streaming inserts will be used. + */ + DEFAULT, + + /** + * Use BigQuery load jobs to insert data. Records will first be written to files, and these + * files will be loaded into BigQuery. This is the default method when the input is bounded. + * This method can be chosen for unbounded inputs as well, as long as a triggering frequency + * is also set using {@link #withTriggeringFrequency}. BigQuery has daily quotas on the number + * of load jobs allowed per day, so be careful not to set the triggering frequency too + * frequent. For more information, see <a + * href="https://cloud.google.com/bigquery/docs/loading-data-cloud-storage">Loading Data from + * Cloud Storage</a>. + */ + FILE_LOADS, + + /** + * Use the BigQuery streaming insert API to insert data. This provides the lowest-latency + * insert path into BigQuery, and therefore is the default method when the input is unbounded. + * BigQuery will make a strong effort to ensure no duplicates when using this path, however + * there are some scenarios in which BigQuery is unable to make this guarantee (see + * https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over + * the output table to periodically clean these rare duplicates. Alternatively, using the + * {@link #FILE_LOADS} insert method does guarantee no duplicates, though the latency for the + * insert into BigQuery will be much higher. For more information, see <a + * href="https://cloud.google.com/bigquery/streaming-data-into-bigquery">Streaming Data into + * BigQuery</a>. + */ + STREAMING_INSERTS + } + @Nullable abstract ValueProvider<String> getJsonTableRef(); @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableDestination> getTableFunction(); @@ -787,6 +833,14 @@ public class BigQueryIO { abstract BigQueryServices getBigQueryServices(); @Nullable abstract Integer getMaxFilesPerBundle(); @Nullable abstract Long getMaxFileSize(); + + abstract int getNumFileShards(); + + @Nullable + abstract Duration getTriggeringFrequency(); + + abstract Method getMethod(); + @Nullable abstract InsertRetryPolicy getFailedInsertRetryPolicy(); abstract Builder<T> toBuilder(); @@ -807,6 +861,13 @@ public class BigQueryIO { abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices); abstract Builder<T> setMaxFilesPerBundle(Integer maxFilesPerBundle); abstract Builder<T> setMaxFileSize(Long maxFileSize); + + abstract Builder<T> setNumFileShards(int numFileShards); + + abstract Builder<T> setTriggeringFrequency(Duration triggeringFrequency); + + abstract Builder<T> setMethod(Method method); + abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy); abstract Write<T> build(); @@ -992,6 +1053,40 @@ public class BigQueryIO { return toBuilder().setValidate(false).build(); } + /** + * Choose the method used to write data to BigQuery. See the Javadoc on {@link Method} for + * information and restrictions of the different methods. + */ + public Write<T> withMethod(Method method) { + return toBuilder().setMethod(method).build(); + } + + /** + * Choose the frequency at which file writes are triggered. + * + * <p>This is only applicable when the write method is set to {@link Method#FILE_LOADS}, and + * only when writing a bounded {@link PCollection}. + * + * <p>Every triggeringFrequency duration, a BigQuery load job will be generated for all the data + * written since the last load job. BigQuery has limits on how many load jobs can be triggered + * per day, so be careful not to set this duration too low, or you may exceed daily quota. Often + * this is set to 5 or 10 minutes to ensure that the project stays well under the BigQuery + * quota. See <a href="https://cloud.google.com/bigquery/quota-policy">Quota Policy</a> for more + * information about BigQuery quotas. + */ + public Write<T> withTriggeringFrequency(Duration triggeringFrequency) { + return toBuilder().setTriggeringFrequency(triggeringFrequency).build(); + } + + /** + * Control how many file shards are written when using BigQuery load jobs. Applicable only when + * also setting {@link #withTriggeringFrequency}. The default value is 1000. + */ + @Experimental + public Write<T> withNumFileShards(int numFileShards) { + return toBuilder().setNumFileShards(numFileShards).build(); + } + @VisibleForTesting Write<T> withTestServices(BigQueryServices testServices) { return toBuilder().setBigQueryServices(testServices).build(); @@ -1029,6 +1124,17 @@ public class BigQueryIO { } } + private Method resolveMethod(PCollection<T> input) { + if (getMethod() != Method.DEFAULT) { + return getMethod(); + } + // By default, when writing an Unbounded PCollection, we use StreamingInserts and + // BigQuery's streaming import API. + return (input.isBounded() == IsBounded.UNBOUNDED) + ? Method.STREAMING_INSERTS + : Method.FILE_LOADS; + } + @Override public WriteResult expand(PCollection<T> input) { // We must have a destination to write to! @@ -1048,6 +1154,7 @@ public class BigQueryIO { || getSchemaFromView() != null, "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided."); + List<?> allToArgs = Lists.newArrayList(getJsonTableRef(), getTableFunction(), getDynamicDestinations()); checkArgument(1 @@ -1061,7 +1168,21 @@ public class BigQueryIO { "No more than one of jsonSchema, schemaFromView, or dynamicDestinations may " + "be set"); - + Method method = resolveMethod(input); + if (input.isBounded() == IsBounded.UNBOUNDED && method == Method.FILE_LOADS) { + checkArgument( + getTriggeringFrequency() != null, + "When writing an unbounded PCollection via FILE_LOADS, " + + "triggering frequency must be specified"); + } else { + checkArgument( + getTriggeringFrequency() == null && getNumFileShards() == 0, + "Triggering frequency or number of file shards can be specified only when writing " + + "an unbounded PCollection via FILE_LOADS, but: the collection was %s " + + "and the method was %s", + input.isBounded(), + method); + } DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations(); if (dynamicDestinations == null) { if (getJsonTableRef() != null) { @@ -1069,17 +1190,20 @@ public class BigQueryIO { DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef( getJsonTableRef(), getTableDescription()); } else if (getTableFunction() != null) { - dynamicDestinations = new TableFunctionDestinations(getTableFunction()); + dynamicDestinations = new TableFunctionDestinations<>(getTableFunction()); } // Wrap with a DynamicDestinations class that will provide a schema. There might be no // schema provided if the create disposition is CREATE_NEVER. if (getJsonSchema() != null) { dynamicDestinations = - new ConstantSchemaDestinations(dynamicDestinations, getJsonSchema()); + new ConstantSchemaDestinations<>( + (DynamicDestinations<T, TableDestination>) dynamicDestinations, getJsonSchema()); } else if (getSchemaFromView() != null) { dynamicDestinations = - new SchemaFromViewDestinations(dynamicDestinations, getSchemaFromView()); + new SchemaFromViewDestinations<>( + (DynamicDestinations<T, TableDestination>) dynamicDestinations, + getSchemaFromView()); } } return expandTyped(input, dynamicDestinations); @@ -1100,9 +1224,9 @@ public class BigQueryIO { .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, getFormatFunction())) .setCoder(KvCoder.of(destinationCoder, TableRowJsonCoder.of())); - // When writing an Unbounded PCollection, we use StreamingInserts and BigQuery's streaming - // import API. - if (input.isBounded() == IsBounded.UNBOUNDED) { + Method method = resolveMethod(input); + + if (method == Method.STREAMING_INSERTS) { checkArgument( getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE, "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded" @@ -1129,6 +1253,8 @@ public class BigQueryIO { if (getMaxFileSize() != null) { batchLoads.setMaxFileSize(getMaxFileSize()); } + batchLoads.setTriggeringFrequency(getTriggeringFrequency()); + batchLoads.setNumFileShards(getNumFileShards()); return rowsWithDestination.apply(batchLoads); } } http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java new file mode 100644 index 0000000..18a359c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * This transforms turns a side input into a singleton PCollection that can be used as the main + * input for another transform. + */ +public class ReifyAsIterable<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>> { + @Override + public PCollection<Iterable<T>> expand(PCollection<T> input) { + final PCollectionView<Iterable<T>> view = input.apply(View.<T>asIterable()); + return input + .getPipeline() + .apply(Create.of((Void) null).withCoder(VoidCoder.of())) + .apply( + ParDo.of( + new DoFn<Void, Iterable<T>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.sideInput(view)); + } + }) + .withSideInputs(view)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index e1ed746..e337f94 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -66,9 +66,10 @@ class WriteBundlesToFiles<DestinationT> private transient Map<DestinationT, TableRowWriter> writers; private transient Map<DestinationT, BoundedWindow> writerWindows; private final PCollectionView<String> tempFilePrefixView; - private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag; + private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittenRecordsTag; private int maxNumWritersPerBundle; private long maxFileSize; + private int spilledShardNumber; /** * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file, @@ -133,11 +134,11 @@ class WriteBundlesToFiles<DestinationT> WriteBundlesToFiles( PCollectionView<String> tempFilePrefixView, - TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag, + TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittenRecordsTag, int maxNumWritersPerBundle, long maxFileSize) { this.tempFilePrefixView = tempFilePrefixView; - this.unwrittedRecordsTag = unwrittedRecordsTag; + this.unwrittenRecordsTag = unwrittenRecordsTag; this.maxNumWritersPerBundle = maxNumWritersPerBundle; this.maxFileSize = maxFileSize; } @@ -148,6 +149,7 @@ class WriteBundlesToFiles<DestinationT> // bundles. this.writers = Maps.newHashMap(); this.writerWindows = Maps.newHashMap(); + this.spilledShardNumber = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR); } TableRowWriter createAndInsertWriter(DestinationT destination, String tempFilePrefix, @@ -174,9 +176,10 @@ class WriteBundlesToFiles<DestinationT> } else { // This means that we already had too many writers open in this bundle. "spill" this record // into the output. It will be grouped and written to a file in a subsequent stage. - c.output(unwrittedRecordsTag, - KV.of(ShardedKey.of(destination, - ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR)), + c.output( + unwrittenRecordsTag, + KV.of( + ShardedKey.of(destination, (++spilledShardNumber) % SPILLED_RECORD_SHARDING_FACTOR), c.element().getValue())); return; } http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 451d1bd..934f1ae 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; @@ -34,12 +35,13 @@ import org.apache.beam.sdk.values.TupleTag; * tablespec and the list of files corresponding to each partition of that table. */ class WritePartition<DestinationT> - extends DoFn<Void, KV<ShardedKey<DestinationT>, List<String>>> { + extends DoFn< + Iterable<WriteBundlesToFiles.Result<DestinationT>>, + KV<ShardedKey<DestinationT>, List<String>>> { private final boolean singletonTable; private final DynamicDestinations<?, DestinationT> dynamicDestinations; private final PCollectionView<String> tempFilePrefix; - private final PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results; - private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag; + @Nullable private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag; private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag; private static class PartitionData { @@ -104,12 +106,10 @@ class WritePartition<DestinationT> boolean singletonTable, DynamicDestinations<?, DestinationT> dynamicDestinations, PCollectionView<String> tempFilePrefix, - PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results, TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag, TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag) { this.singletonTable = singletonTable; this.dynamicDestinations = dynamicDestinations; - this.results = results; this.tempFilePrefix = tempFilePrefix; this.multiPartitionsTag = multiPartitionsTag; this.singlePartitionTag = singlePartitionTag; @@ -117,8 +117,7 @@ class WritePartition<DestinationT> @ProcessElement public void processElement(ProcessContext c) throws Exception { - List<WriteBundlesToFiles.Result<DestinationT>> results = - Lists.newArrayList(c.sideInput(this.results)); + List<WriteBundlesToFiles.Result<DestinationT>> results = Lists.newArrayList(c.element()); // If there are no elements to write _and_ the user specified a constant output table, then // generate an empty table of that name. http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index f641b32..eb1da5f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -22,9 +22,11 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -35,74 +37,85 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Copies temporary tables to destination table. + * Copies temporary tables to destination table. The input element is an {@link Iterable} that + * provides the list of all temporary tables created for a given {@link TableDestination}. */ -class WriteRename extends DoFn<Void, Void> { +class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> { private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class); private final BigQueryServices bqServices; private final PCollectionView<String> jobIdToken; - private final WriteDisposition writeDisposition; - private final CreateDisposition createDisposition; - // Map from final destination to a list of temporary tables that need to be copied into it. - private final PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView; + // In the triggered scenario, the user-supplied create and write dispositions only apply to the + // first trigger pane, as that's when when the table is created. Subsequent loads should always + // append to the table, and so use CREATE_NEVER and WRITE_APPEND dispositions respectively. + private final WriteDisposition firstPaneWriteDisposition; + private final CreateDisposition firstPaneCreateDisposition; public WriteRename( BigQueryServices bqServices, PCollectionView<String> jobIdToken, WriteDisposition writeDisposition, - CreateDisposition createDisposition, - PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView) { + CreateDisposition createDisposition) { this.bqServices = bqServices; this.jobIdToken = jobIdToken; - this.writeDisposition = writeDisposition; - this.createDisposition = createDisposition; - this.tempTablesView = tempTablesView; + this.firstPaneWriteDisposition = writeDisposition; + this.firstPaneCreateDisposition = createDisposition; } @ProcessElement public void processElement(ProcessContext c) throws Exception { - Map<TableDestination, Iterable<String>> tempTablesMap = - Maps.newHashMap(c.sideInput(tempTablesView)); - - // Process each destination table. - for (Map.Entry<TableDestination, Iterable<String>> entry : tempTablesMap.entrySet()) { - TableDestination finalTableDestination = entry.getKey(); - List<String> tempTablesJson = Lists.newArrayList(entry.getValue()); - // Do not copy if no temp tables are provided - if (tempTablesJson.size() == 0) { - return; - } + Multimap<TableDestination, String> tempTables = ArrayListMultimap.create(); + for (KV<TableDestination, String> entry : c.element()) { + tempTables.put(entry.getKey(), entry.getValue()); + } + for (Map.Entry<TableDestination, Collection<String>> entry : tempTables.asMap().entrySet()) { + // Process each destination table. + writeRename(entry.getKey(), entry.getValue(), c); + } + } - List<TableReference> tempTables = Lists.newArrayList(); - for (String table : tempTablesJson) { - tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class)); - } + private void writeRename( + TableDestination finalTableDestination, Iterable<String> tempTableNames, ProcessContext c) + throws Exception { + WriteDisposition writeDisposition = + (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND; + CreateDisposition createDisposition = + (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER; + List<String> tempTablesJson = Lists.newArrayList(tempTableNames); + // Do not copy if no temp tables are provided + if (tempTablesJson.size() == 0) { + return; + } - // Make sure each destination table gets a unique job id. - String jobIdPrefix = BigQueryHelpers.createJobId( - c.sideInput(jobIdToken), finalTableDestination, -1); - - copy( - bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdPrefix, - finalTableDestination.getTableReference(), - tempTables, - writeDisposition, - createDisposition, - finalTableDestination.getTableDescription()); - - DatasetService tableService = - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); - removeTemporaryTables(tableService, tempTables); + List<TableReference> tempTables = Lists.newArrayList(); + for (String table : tempTablesJson) { + tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class)); } + + // Make sure each destination table gets a unique job id. + String jobIdPrefix = + BigQueryHelpers.createJobId(c.sideInput(jobIdToken), finalTableDestination, -1); + + copy( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + finalTableDestination.getTableReference(), + tempTables, + writeDisposition, + createDisposition, + finalTableDestination.getTableDescription()); + + DatasetService tableService = + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); + removeTemporaryTables(tableService, tempTables); } private void copy( @@ -174,9 +187,11 @@ class WriteRename extends DoFn<Void, Void> { super.populateDisplayData(builder); builder - .add(DisplayData.item("writeDisposition", writeDisposition.toString()) - .withLabel("Write Disposition")) - .add(DisplayData.item("createDisposition", createDisposition.toString()) - .withLabel("Create Disposition")); + .add( + DisplayData.item("firstPaneWriteDisposition", firstPaneWriteDisposition.toString()) + .withLabel("Write Disposition")) + .add( + DisplayData.item("firstPaneCreateDisposition", firstPaneCreateDisposition.toString()) + .withLabel("Create Disposition")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 9ed2916..24911a7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -26,6 +26,7 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.io.IOException; import java.util.Collection; import java.util.List; @@ -65,35 +66,48 @@ class WriteTables<DestinationT> private final boolean singlePartition; private final BigQueryServices bqServices; private final PCollectionView<String> jobIdToken; - private final PCollectionView<Map<DestinationT, String>> schemasView; - private final WriteDisposition writeDisposition; - private final CreateDisposition createDisposition; + private final WriteDisposition firstPaneWriteDisposition; + private final CreateDisposition firstPaneCreateDisposition; private final DynamicDestinations<?, DestinationT> dynamicDestinations; + private Map<DestinationT, String> jsonSchemas = Maps.newHashMap(); public WriteTables( boolean singlePartition, BigQueryServices bqServices, PCollectionView<String> jobIdToken, - PCollectionView<Map<DestinationT, String>> schemasView, WriteDisposition writeDisposition, CreateDisposition createDisposition, DynamicDestinations<?, DestinationT> dynamicDestinations) { this.singlePartition = singlePartition; this.bqServices = bqServices; this.jobIdToken = jobIdToken; - this.schemasView = schemasView; - this.writeDisposition = writeDisposition; - this.createDisposition = createDisposition; + this.firstPaneWriteDisposition = writeDisposition; + this.firstPaneCreateDisposition = createDisposition; this.dynamicDestinations = dynamicDestinations; } + @StartBundle + public void startBundle(StartBundleContext c) { + // Clear the map on each bundle so we can notice side-input updates. + // (alternative is to use a cache with a TTL). + jsonSchemas.clear(); + } + @ProcessElement public void processElement(ProcessContext c) throws Exception { dynamicDestinations.setSideInputAccessorFromProcessContext(c); DestinationT destination = c.element().getKey().getKey(); - TableSchema tableSchema = - BigQueryHelpers.fromJsonString( - c.sideInput(schemasView).get(destination), TableSchema.class); + TableSchema tableSchema; + String jsonSchema = jsonSchemas.get(destination); + if (jsonSchema != null) { + tableSchema = BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class); + } else { + tableSchema = dynamicDestinations.getSchema(destination); + if (tableSchema != null) { + jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema)); + } + } + TableDestination tableDestination = dynamicDestinations.getTable(destination); TableReference tableReference = tableDestination.getTableReference(); if (Strings.isNullOrEmpty(tableReference.getProjectId())) { @@ -112,6 +126,10 @@ class WriteTables<DestinationT> tableReference.setTableId(jobIdPrefix); } + WriteDisposition writeDisposition = + (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND; + CreateDisposition createDisposition = + (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER; load( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 8db4e94..3d53b7e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -47,12 +47,14 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import java.io.File; import java.io.FileFilter; import java.io.IOException; @@ -80,7 +82,6 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -91,6 +92,7 @@ import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; @@ -105,6 +107,9 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testing.UsesTestStream; +import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; @@ -130,20 +135,19 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; -import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; @@ -540,65 +544,73 @@ public class BigQueryIOTest implements Serializable { if (streaming) { users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); } - users.apply("WriteBigQuery", BigQueryIO.<String>write() + users.apply( + "WriteBigQuery", + BigQueryIO.<String>write() .withTestServices(fakeBqServices) .withMaxFilesPerBundle(5) .withMaxFileSize(10) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withFormatFunction(new SerializableFunction<String, TableRow>() { - @Override - public TableRow apply(String user) { - Matcher matcher = userPattern.matcher(user); - if (matcher.matches()) { - return new TableRow().set("name", matcher.group(1)) - .set("id", Integer.valueOf(matcher.group(2))); - } - throw new RuntimeException("Unmatching element " + user); - } - }) - .to(new StringIntegerDestinations() { - @Override - public Integer getDestination(ValueInSingleWindow<String> element) { - assertThat(element.getWindow(), Matchers.instanceOf(PartitionedGlobalWindow.class)); - Matcher matcher = userPattern.matcher(element.getValue()); - if (matcher.matches()) { - // Since we name tables by userid, we can simply store an Integer to represent - // a table. - return Integer.valueOf(matcher.group(2)); - } - throw new RuntimeException("Unmatching destination " + element.getValue()); - } + .withFormatFunction( + new SerializableFunction<String, TableRow>() { + @Override + public TableRow apply(String user) { + Matcher matcher = userPattern.matcher(user); + if (matcher.matches()) { + return new TableRow() + .set("name", matcher.group(1)) + .set("id", Integer.valueOf(matcher.group(2))); + } + throw new RuntimeException("Unmatching element " + user); + } + }) + .to( + new StringIntegerDestinations() { + @Override + public Integer getDestination(ValueInSingleWindow<String> element) { + assertThat( + element.getWindow(), Matchers.instanceOf(PartitionedGlobalWindow.class)); + Matcher matcher = userPattern.matcher(element.getValue()); + if (matcher.matches()) { + // Since we name tables by userid, we can simply store an Integer to represent + // a table. + return Integer.valueOf(matcher.group(2)); + } + throw new RuntimeException("Unmatching destination " + element.getValue()); + } - @Override - public TableDestination getTable(Integer userId) { - verifySideInputs(); - // Each user in it's own table. - return new TableDestination("dataset-id.userid-" + userId, - "table for userid " + userId); - } + @Override + public TableDestination getTable(Integer userId) { + verifySideInputs(); + // Each user in it's own table. + return new TableDestination( + "dataset-id.userid-" + userId, "table for userid " + userId); + } - @Override - public TableSchema getSchema(Integer userId) { - verifySideInputs(); - return new TableSchema().setFields( - ImmutableList.of( - new TableFieldSchema().setName("name").setType("STRING"), - new TableFieldSchema().setName("id").setType("INTEGER"))); - } + @Override + public TableSchema getSchema(Integer userId) { + verifySideInputs(); + return new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("id").setType("INTEGER"))); + } - @Override - public List<PCollectionView<?>> getSideInputs() { - return ImmutableList.of(sideInput1, sideInput2); - } + @Override + public List<PCollectionView<?>> getSideInputs() { + return ImmutableList.of(sideInput1, sideInput2); + } - private void verifySideInputs() { - assertThat(sideInput(sideInput1), containsInAnyOrder("a", "b", "c")); - Map<String, String> mapSideInput = sideInput(sideInput2); - assertEquals(3, mapSideInput.size()); - assertThat(mapSideInput, - allOf(hasEntry("a", "a"), hasEntry("b", "b"), hasEntry("c", "c"))); - } - }) + private void verifySideInputs() { + assertThat(sideInput(sideInput1), containsInAnyOrder("a", "b", "c")); + Map<String, String> mapSideInput = sideInput(sideInput2); + assertEquals(3, mapSideInput.size()); + assertThat( + mapSideInput, + allOf(hasEntry("a", "a"), hasEntry("b", "b"), hasEntry("c", "c"))); + } + }) .withoutValidation()); p.run(); @@ -626,6 +638,59 @@ public class BigQueryIOTest implements Serializable { } @Test + @Category({ValidatesRunner.class, UsesTestStream.class}) + public void testTriggeredFileLoads() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("project-id"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeDatasetService datasetService = new FakeDatasetService(); + FakeBigQueryServices fakeBqServices = + new FakeBigQueryServices() + .withJobService(new FakeJobService()) + .withDatasetService(datasetService); + + List<TableRow> elements = Lists.newArrayList(); + for (int i = 0; i < 30; ++i) { + elements.add(new TableRow().set("number", i)); + } + + datasetService.createDataset("project-id", "dataset-id", "", ""); + TestStream<TableRow> testStream = + TestStream.create(TableRowJsonCoder.of()) + .addElements( + elements.get(0), Iterables.toArray(elements.subList(1, 10), TableRow.class)) + .advanceProcessingTime(Duration.standardMinutes(1)) + .addElements( + elements.get(10), Iterables.toArray(elements.subList(11, 20), TableRow.class)) + .advanceProcessingTime(Duration.standardMinutes(1)) + .addElements( + elements.get(20), Iterables.toArray(elements.subList(21, 30), TableRow.class)) + .advanceWatermarkToInfinity(); + + Pipeline p = TestPipeline.create(bqOptions); + p.apply(testStream) + .apply( + BigQueryIO.writeTableRows() + .to("project-id:dataset-id.table-id") + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withTriggeringFrequency(Duration.standardSeconds(30)) + .withNumFileShards(2) + .withMethod(Method.FILE_LOADS) + .withoutValidation()); + p.run(); + + assertThat( + datasetService.getAllRows("project-id", "dataset-id", "table-id"), + containsInAnyOrder(Iterables.toArray(elements, TableRow.class))); + } + + @Test public void testRetryPolicy() throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("project-id"); @@ -1796,25 +1861,24 @@ public class BigQueryIOTest implements Serializable { TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag = new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {}; - PCollectionView<Iterable<WriteBundlesToFiles.Result<TableDestination>>> resultsView = - p.apply( - Create.of(files) - .withCoder(WriteBundlesToFiles.ResultCoder.of(TableDestinationCoder.of()))) - .apply(View.<WriteBundlesToFiles.Result<TableDestination>>asIterable()); - String tempFilePrefix = testFolder.newFolder("BigQueryIOTest").getAbsolutePath(); PCollectionView<String> tempFilePrefixView = p.apply(Create.of(tempFilePrefix)).apply(View.<String>asSingleton()); WritePartition<TableDestination> writePartition = - new WritePartition<>(isSingleton, dynamicDestinations, tempFilePrefixView, - resultsView, multiPartitionsTag, singlePartitionTag); - - DoFnTester<Void, KV<ShardedKey<TableDestination>, List<String>>> tester = - DoFnTester.of(writePartition); - tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files); + new WritePartition<>( + isSingleton, + dynamicDestinations, + tempFilePrefixView, + multiPartitionsTag, + singlePartitionTag); + + DoFnTester< + Iterable<WriteBundlesToFiles.Result<TableDestination>>, + KV<ShardedKey<TableDestination>, List<String>>> + tester = DoFnTester.of(writePartition); tester.setSideInput(tempFilePrefixView, GlobalWindow.INSTANCE, tempFilePrefix); - tester.processElement(null); + tester.processElement(files); List<KV<ShardedKey<TableDestination>, List<String>>> partitions; if (expectedNumPartitionsPerTable > 1) { @@ -1864,7 +1928,7 @@ public class BigQueryIOTest implements Serializable { @Override public TableSchema getSchema(String destination) { - throw new UnsupportedOperationException("getSchema not expected in this test."); + return null; } } @@ -1926,16 +1990,11 @@ public class BigQueryIOTest implements Serializable { .apply("CreateJobId", Create.of("jobId")) .apply(View.<String>asSingleton()); - PCollectionView<Map<String, String>> schemaMapView = - p.apply("CreateEmptySchema", - Create.empty(new TypeDescriptor<KV<String, String>>() {})) - .apply(View.<String, String>asMap()); WriteTables<String> writeTables = new WriteTables<>( false, fakeBqServices, jobIdTokenView, - schemaMapView, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, new IdentityDynamicTables()); @@ -1943,7 +2002,6 @@ public class BigQueryIOTest implements Serializable { DoFnTester<KV<ShardedKey<String>, List<String>>, KV<TableDestination, String>> tester = DoFnTester.of(writeTables); tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); - tester.setSideInput(schemaMapView, GlobalWindow.INSTANCE, ImmutableMap.<String, String>of()); tester.getPipelineOptions().setTempLocation("tempLocation"); for (KV<ShardedKey<String>, List<String>> partition : partitions) { tester.processElement(partition); @@ -1999,21 +2057,14 @@ public class BigQueryIOTest implements Serializable { final int numTempTablesPerFinalTable = 3; final int numRecordsPerTempTable = 10; - Map<TableDestination, List<TableRow>> expectedRowsPerTable = Maps.newHashMap(); + Multimap<TableDestination, TableRow> expectedRowsPerTable = ArrayListMultimap.create(); String jobIdToken = "jobIdToken"; - Map<TableDestination, Iterable<String>> tempTables = Maps.newHashMap(); + Multimap<TableDestination, String> tempTables = ArrayListMultimap.create(); + List<KV<TableDestination, String>> tempTablesElement = Lists.newArrayList(); for (int i = 0; i < numFinalTables; ++i) { String tableName = "project-id:dataset-id.table_" + i; TableDestination tableDestination = new TableDestination( tableName, "table_" + i + "_desc"); - List<String> tables = Lists.newArrayList(); - tempTables.put(tableDestination, tables); - - List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination); - if (expectedRows == null) { - expectedRows = Lists.newArrayList(); - expectedRowsPerTable.put(tableDestination, expectedRows); - } for (int j = 0; i < numTempTablesPerFinalTable; ++i) { TableReference tempTable = new TableReference() .setProjectId("project-id") @@ -2026,56 +2077,36 @@ public class BigQueryIOTest implements Serializable { rows.add(new TableRow().set("number", j * numTempTablesPerFinalTable + k)); } datasetService.insertAll(tempTable, rows, null); - expectedRows.addAll(rows); - tables.add(BigQueryHelpers.toJsonString(tempTable)); + expectedRowsPerTable.putAll(tableDestination, rows); + String tableJson = BigQueryHelpers.toJsonString(tempTable); + tempTables.put(tableDestination, tableJson); + tempTablesElement.add(KV.of(tableDestination, tableJson)); } } - PCollection<KV<TableDestination, String>> tempTablesPCollection = - p.apply(Create.of(tempTables) - .withCoder(KvCoder.of(TableDestinationCoder.of(), - IterableCoder.of(StringUtf8Coder.of())))) - .apply(ParDo.of(new DoFn<KV<TableDestination, Iterable<String>>, - KV<TableDestination, String>>() { - @ProcessElement - public void processElement(ProcessContext c) { - TableDestination tableDestination = c.element().getKey(); - for (String tempTable : c.element().getValue()) { - c.output(KV.of(tableDestination, tempTable)); - } - } - })); - - PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = - PCollectionViews.multimapView( - tempTablesPCollection, - WindowingStrategy.globalDefault(), - KvCoder.of(TableDestinationCoder.of(), - StringUtf8Coder.of())); PCollectionView<String> jobIdTokenView = p .apply("CreateJobId", Create.of("jobId")) .apply(View.<String>asSingleton()); - WriteRename writeRename = new WriteRename( - fakeBqServices, - jobIdTokenView, - WriteDisposition.WRITE_EMPTY, - CreateDisposition.CREATE_IF_NEEDED, - tempTablesView); + WriteRename writeRename = + new WriteRename( + fakeBqServices, + jobIdTokenView, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED); - DoFnTester<Void, Void> tester = DoFnTester.of(writeRename); - tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables); + DoFnTester<Iterable<KV<TableDestination, String>>, Void> tester = DoFnTester.of(writeRename); tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); - tester.processElement(null); + tester.processElement(tempTablesElement); - for (Map.Entry<TableDestination, Iterable<String>> entry : tempTables.entrySet()) { + for (Map.Entry<TableDestination, Collection<String>> entry : tempTables.asMap().entrySet()) { TableDestination tableDestination = entry.getKey(); TableReference tableReference = tableDestination.getTableReference(); Table table = checkNotNull(datasetService.getTable(tableReference)); assertEquals(tableReference.getTableId() + "_desc", tableDestination.getTableDescription()); - List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination); + Collection<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination); assertThat(datasetService.getAllRows(tableReference.getProjectId(), tableReference.getDatasetId(), tableReference.getTableId()), containsInAnyOrder(Iterables.toArray(expectedRows, TableRow.class)));