Separate streaming writes into two pluggable components - CreateTables, and StreamingWriteTables. Also address many code review comments. Also merge with master.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d13061c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d13061c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d13061c Branch: refs/heads/DSL_SQL Commit: 7d13061cc36466c502bbc1f61d391743dd3739af Parents: b486137 Author: Reuven Lax <re...@google.com> Authored: Sun Apr 2 21:39:50 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Apr 18 21:12:50 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 176 ++++++++++--------- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 13 ++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 21 ++- .../io/gcp/bigquery/BigQueryTableSource.java | 4 +- .../beam/sdk/io/gcp/bigquery/CreateTables.java | 95 ++++++---- .../io/gcp/bigquery/GenerateShardedTable.java | 3 +- .../beam/sdk/io/gcp/bigquery/PrepareWrite.java | 80 +++++---- .../beam/sdk/io/gcp/bigquery/ShardedKey.java | 1 + .../sdk/io/gcp/bigquery/StreamingInserts.java | 44 +---- .../io/gcp/bigquery/StreamingWriteTables.java | 86 +++++++++ .../sdk/io/gcp/bigquery/TableDestination.java | 1 + .../io/gcp/bigquery/TableDestinationCoder.java | 62 +++---- .../sdk/io/gcp/bigquery/TableRowWriter.java | 14 +- .../sdk/io/gcp/bigquery/TagWithUniqueIds.java | 14 +- .../io/gcp/bigquery/WriteBundlesToFiles.java | 25 +-- .../sdk/io/gcp/bigquery/WritePartition.java | 127 ++++++++----- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 5 +- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 17 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 66 ++++--- 19 files changed, 516 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 06fdfce..236b234 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; -import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import java.io.IOException; @@ -35,7 +34,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -47,6 +45,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -54,17 +53,13 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; - -/** - * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. - */ +/** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> { BigQueryIO.Write<?> write; - private static class ConstantSchemaFunction implements - SerializableFunction<TableDestination, TableSchema> { - private final @Nullable - ValueProvider<String> jsonSchema; + private static class ConstantSchemaFunction + implements SerializableFunction<TableDestination, TableSchema> { + private final @Nullable ValueProvider<String> jsonSchema; ConstantSchemaFunction(ValueProvider<String> jsonSchema) { this.jsonSchema = jsonSchema; @@ -86,7 +81,6 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) { Pipeline p = input.getPipeline(); BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); - ValueProvider<TableReference> table = write.getTableWithDefaultProject(options); final String stepUuid = BigQueryHelpers.randomUUIDString(); @@ -94,40 +88,41 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, String tempFilePrefix; try { IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - tempFilePrefix = factory.resolve( - factory.resolve(tempLocation, "BigQueryWriteTemp"), - stepUuid); + tempFilePrefix = + factory.resolve(factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid); } catch (IOException e) { throw new RuntimeException( - String.format("Failed to resolve BigQuery temp location in %s", tempLocation), - e); + String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e); } // Create a singleton job ID token at execution time. This will be used as the base for all // load jobs issued from this instance of the transfomr. PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix)); - PCollectionView<String> jobIdTokenView = p - .apply("TriggerIdCreation", Create.of("ignored")) - .apply("CreateJobId", MapElements.via( - new SimpleFunction<String, String>() { - @Override - public String apply(String input) { - return stepUuid; - } - })) - .apply(View.<String>asSingleton()); + PCollectionView<String> jobIdTokenView = + p.apply("TriggerIdCreation", Create.of("ignored")) + .apply( + "CreateJobId", + MapElements.via( + new SimpleFunction<String, String>() { + @Override + public String apply(String input) { + return stepUuid; + } + })) + .apply(View.<String>asSingleton()); PCollection<KV<TableDestination, TableRow>> inputInGlobalWindow = - input.apply("rewindowIntoGlobal", + input.apply( + "rewindowIntoGlobal", Window.<KV<TableDestination, TableRow>>into(new GlobalWindows()) .triggering(DefaultTrigger.of()) .discardingFiredPanes()); // PCollection of filename, file byte size, and table destination. - PCollection<WriteBundlesToFiles.Result> results = inputInGlobalWindow - .apply("WriteBundlesToFiles", - ParDo.of(new WriteBundlesToFiles(tempFilePrefix))) - .setCoder(WriteBundlesToFiles.ResultCoder.of()); + PCollection<WriteBundlesToFiles.Result> results = + inputInGlobalWindow + .apply("WriteBundlesToFiles", ParDo.of(new WriteBundlesToFiles(tempFilePrefix))) + .setCoder(WriteBundlesToFiles.ResultCoder.of()); TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag = new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {}; @@ -136,20 +131,23 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, // Turn the list of files and record counts in a PCollectionView that can be used as a // side input. - PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = results - .apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable()); + PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = + results.apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable()); // This transform will look at the set of files written for each table, and if any table has // too many files or bytes, will partition that table's files into multiple partitions for // loading. - PCollectionTuple partitions = singleton.apply("WritePartition", - ParDo.of(new WritePartition( - write.getJsonTableRef(), - write.getTableDescription(), - resultsView, - multiPartitionsTag, - singlePartitionTag)) - .withSideInputs(resultsView) - .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); + PCollectionTuple partitions = + singleton.apply( + "WritePartition", + ParDo.of( + new WritePartition( + write.getJsonTableRef(), + write.getTableDescription(), + resultsView, + multiPartitionsTag, + singlePartitionTag)) + .withSideInputs(resultsView) + .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant // schema function here. If no schema is specified, this function will return null. @@ -158,55 +156,69 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, new ConstantSchemaFunction(write.getJsonSchema()); Coder<KV<ShardedKey<TableDestination>, List<String>>> partitionsCoder = - KvCoder.of(ShardedKeyCoder.of(TableDestinationCoder.of()), - ListCoder.of(StringUtf8Coder.of())); + KvCoder.of( + ShardedKeyCoder.of(TableDestinationCoder.of()), ListCoder.of(StringUtf8Coder.of())); // If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then // the import needs to be split into multiple partitions, and those partitions will be // specified in multiPartitionsTag. - PCollection<KV<TableDestination, String>> tempTables = partitions.get(multiPartitionsTag) - .setCoder(partitionsCoder) - // What's this GroupByKey for? Is this so we have a deterministic temp tables? If so, maybe - // Reshuffle is better here. - .apply("MultiPartitionsGroupByKey", - GroupByKey.<ShardedKey<TableDestination>, List<String>>create()) - .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( - false, - write.getBigQueryServices(), - jobIdTokenView, - tempFilePrefix, - WriteDisposition.WRITE_EMPTY, - CreateDisposition.CREATE_IF_NEEDED, - schemaFunction)) - .withSideInputs(jobIdTokenView)); + PCollection<KV<TableDestination, String>> tempTables = + partitions + .get(multiPartitionsTag) + .setCoder(partitionsCoder) + // Reshuffle will distribute this among multiple workers, and also guard against + // reexecution of the WritePartitions step once WriteTables has begun. + .apply( + "MultiPartitionsReshuffle", + Reshuffle.<ShardedKey<TableDestination>, List<String>>of()) + .apply( + "MultiPartitionsWriteTables", + ParDo.of( + new WriteTables( + false, + write.getBigQueryServices(), + jobIdTokenView, + tempFilePrefix, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED, + schemaFunction)) + .withSideInputs(jobIdTokenView)); // This view maps each final table destination to the set of temporary partitioned tables // the PCollection was loaded into. - PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = tempTables - .apply("TempTablesView", View.<TableDestination, String>asMultimap()); - - singleton.apply("WriteRename", ParDo - .of(new WriteRename( - write.getBigQueryServices(), - jobIdTokenView, - write.getWriteDisposition(), - write.getCreateDisposition(), - tempTablesView)) - .withSideInputs(tempTablesView, jobIdTokenView)); + PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = + tempTables.apply("TempTablesView", View.<TableDestination, String>asMultimap()); + + singleton.apply( + "WriteRename", + ParDo.of( + new WriteRename( + write.getBigQueryServices(), + jobIdTokenView, + write.getWriteDisposition(), + write.getCreateDisposition(), + tempTablesView)) + .withSideInputs(tempTablesView, jobIdTokenView)); // Write single partition to final table - partitions.get(singlePartitionTag) + partitions + .get(singlePartitionTag) .setCoder(partitionsCoder) - .apply("SinglePartitionGroupByKey", - GroupByKey.<ShardedKey<TableDestination>, List<String>>create()) - .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( - true, - write.getBigQueryServices(), - jobIdTokenView, - tempFilePrefix, - write.getWriteDisposition(), - write.getCreateDisposition(), - schemaFunction)) - .withSideInputs(jobIdTokenView)); + // Reshuffle will distribute this among multiple workers, and also guard against + // reexecution of the WritePartitions step once WriteTables has begun. + .apply( + "SinglePartitionsReshuffle", Reshuffle.<ShardedKey<TableDestination>, List<String>>of()) + .apply( + "SinglePartitionWriteTables", + ParDo.of( + new WriteTables( + true, + write.getBigQueryServices(), + jobIdTokenView, + tempFilePrefix, + write.getWriteDisposition(), + write.getCreateDisposition(), + schemaFunction)) + .withSideInputs(jobIdTokenView)); return WriteResult.in(input.getPipeline()); } http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 846103d..e04361c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -26,6 +26,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; +import com.google.common.hash.Hashing; import java.io.IOException; import java.util.ArrayList; @@ -234,6 +235,18 @@ public class BigQueryHelpers { } } + // Create a unique job id for a table load. + static String createJobId(String prefix, TableDestination tableDestination, int partition) { + // Job ID must be different for each partition of each table. + String destinationHash = + Hashing.murmur3_128().hashUnencodedChars(tableDestination.toString()).toString(); + if (partition >= 0) { + return String.format("%s_%s_%05d", prefix, destinationHash, partition); + } else { + return String.format("%s_%s", prefix, destinationHash); + } + } + @VisibleForTesting static class JsonSchemaToTableSchema implements SerializableFunction<String, TableSchema> { http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 54a25c7..3f5947e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -61,7 +61,6 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelFactory; @@ -445,7 +444,8 @@ public class BigQueryIO { // Note that a table or query check can fail if the table or dataset are created by // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline. // For these cases the withoutValidation method can be used to disable the check. - if (getValidate() && table != null) { + if (getValidate() && table != null && table.isAccessible() && table.get().getProjectId() + != null) { checkState(table.isAccessible(), "Cannot call validate if table is dynamically set."); // Check for source table presence for early failure notification. DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions); @@ -650,6 +650,7 @@ public class BigQueryIO { public static <T> Write<T> write() { return new AutoValue_BigQueryIO_Write.Builder<T>() .setValidate(true) + .setTableDescription("") .setBigQueryServices(new BigQueryServicesImpl()) .setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED) .setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY) @@ -690,7 +691,8 @@ public class BigQueryIO { @Nullable abstract ValueProvider<String> getJsonSchema(); abstract CreateDisposition getCreateDisposition(); abstract WriteDisposition getWriteDisposition(); - @Nullable abstract String getTableDescription(); + /** Table description. Default is empty. */ + abstract String getTableDescription(); /** An option to indicate if table validation is desired. Default is true. */ abstract boolean getValidate(); abstract BigQueryServices getBigQueryServices(); @@ -805,9 +807,6 @@ public class BigQueryIO { public Write<T> to(ValueProvider<String> tableSpec) { ensureToNotCalledYet(); String tableDescription = getTableDescription(); - if (tableDescription == null) { - tableDescription = ""; - } return toBuilder() .setJsonTableRef( NestedValueProvider.of( @@ -911,7 +910,7 @@ public class BigQueryIO { public void validate(PCollection<T> input) { BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); - // Exactly one of the table and table reference can be configured. + // We must have a destination to write to! checkState(getTableFunction() != null, "must set the table reference of a BigQueryIO.Write transform"); @@ -972,8 +971,8 @@ public class BigQueryIO { @Override public WriteResult expand(PCollection<T> input) { PCollection<KV<TableDestination, TableRow>> rowsWithDestination = - input.apply("PrepareWrite", ParDo.of( - new PrepareWrite<T>(getTableFunction(), getFormatFunction()))) + input.apply("PrepareWrite", new PrepareWrite<T>( + getTableFunction(), getFormatFunction())) .setCoder(KvCoder.of(TableDestinationCoder.of(), TableRowJsonCoder.of())); @@ -1013,8 +1012,8 @@ public class BigQueryIO { .withLabel("Table WriteDisposition")) .addIfNotDefault(DisplayData.item("validation", getValidate()) .withLabel("Validation Enabled"), true) - .addIfNotNull(DisplayData.item("tableDescription", getTableDescription()) - .withLabel("Table Description")); + .addIfNotDefault(DisplayData.item("tableDescription", getTableDescription()) + .withLabel("Table Description"), ""); } /** Returns the table schema. */ http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java index 22aba64..a28da92 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java @@ -109,8 +109,8 @@ class BigQueryTableSource extends BigQuerySourceBase { @Override public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception { if (tableSizeBytes.get() == null) { - TableReference table = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), - TableReference.class); + TableReference table = setDefaultProjectIfAbsent(options.as(BigQueryOptions.class), + BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class)); Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class)) .getTable(table).getNumBytes(); http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java index e216553..a78f32d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -1,68 +1,94 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; + import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; - +import org.apache.beam.sdk.values.PCollection; /** - * Creates any tables needed before performing streaming writes to the tables. This is a - * side-effect {l@ink DoFn}, and returns the original collection unchanged. + * Creates any tables needed before performing streaming writes to the tables. This is a side-effect + * {@link DoFn}, and returns the original collection unchanged. */ -public class CreateTables extends DoFn<KV<TableDestination, TableRow>, - KV<TableDestination, TableRow>> { +public class CreateTables + extends PTransform< + PCollection<KV<TableDestination, TableRow>>, PCollection<KV<TableDestination, TableRow>>> { private final CreateDisposition createDisposition; private final BigQueryServices bqServices; private final SerializableFunction<TableDestination, TableSchema> schemaFunction; - - /** The list of tables created so far, so we don't try the creation - each time. - * TODO: We should put a bound on memory usage of this. Use guava cache instead. + /** + * The list of tables created so far, so we don't try the creation each time. + * + * <p>TODO: We should put a bound on memory usage of this. Use guava cache instead. */ private static Set<String> createdTables = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); - public CreateTables(CreateDisposition createDisposition, BigQueryServices bqServices, - SerializableFunction<TableDestination, TableSchema> schemaFunction) { + public CreateTables( + CreateDisposition createDisposition, + SerializableFunction<TableDestination, TableSchema> schemaFunction) { + this(createDisposition, new BigQueryServicesImpl(), schemaFunction); + } + + private CreateTables( + CreateDisposition createDisposition, + BigQueryServices bqServices, + SerializableFunction<TableDestination, TableSchema> schemaFunction) { this.createDisposition = createDisposition; this.bqServices = bqServices; this.schemaFunction = schemaFunction; } - @ProcessElement - public void processElement(ProcessContext context) throws InterruptedException, IOException { - BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); - possibleCreateTable(options, context.element().getKey()); - context.output(context.element()); + CreateTables withTestServices(BigQueryServices bqServices) { + return new CreateTables(createDisposition, bqServices, schemaFunction); + } + + @Override + public PCollection<KV<TableDestination, TableRow>> expand( + PCollection<KV<TableDestination, TableRow>> input) { + return input.apply( + ParDo.of( + new DoFn<KV<TableDestination, TableRow>, KV<TableDestination, TableRow>>() { + @ProcessElement + public void processElement(ProcessContext context) + throws InterruptedException, IOException { + BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); + possibleCreateTable(options, context.element().getKey()); + context.output(context.element()); + } + })); } private void possibleCreateTable(BigQueryOptions options, TableDestination tableDestination) @@ -70,8 +96,7 @@ public class CreateTables extends DoFn<KV<TableDestination, TableRow>, String tableSpec = tableDestination.getTableSpec(); TableReference tableReference = tableDestination.getTableReference(); String tableDescription = tableDestination.getTableDescription(); - if (createDisposition != createDisposition.CREATE_NEVER - && !createdTables.contains(tableSpec)) { + if (createDisposition != createDisposition.CREATE_NEVER && !createdTables.contains(tableSpec)) { synchronized (createdTables) { // Another thread may have succeeded in creating the table in the meanwhile, so // check again. This check isn't needed for correctness, but we add it to prevent @@ -92,6 +117,8 @@ public class CreateTables extends DoFn<KV<TableDestination, TableRow>, } } + /** This method is used by the testing fake to clear static state. */ + @VisibleForTesting static void clearCreatedTables() { synchronized (createdTables) { createdTables.clear(); http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java index da3a70a..90d41a0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java @@ -39,8 +39,7 @@ class GenerateShardedTable extends DoFn<KV<TableDestination, TableRow>, @ProcessElement public void processElement(ProcessContext context, BoundedWindow window) throws IOException { ThreadLocalRandom randomGenerator = ThreadLocalRandom.current(); - // We output on keys 0-50 to ensure that there's enough batching for - // BigQuery. + // We output on keys 0-numShards. String tableSpec = context.element().getKey().getTableSpec(); context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, numShards)), context.element().getValue())); http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java index 7712417..a8bdb43 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java @@ -1,20 +1,20 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableReference; @@ -23,6 +23,8 @@ import com.google.common.base.Strings; import java.io.IOException; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; @@ -30,37 +32,49 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.ValueInSingleWindow; /** - * Prepare an input {@link PCollection} for writing to BigQuery. Use the table-reference - * function to determine which tables each element is written to, and format the element into a - * {@link TableRow} using the user-supplied format function. + * Prepare an input {@link PCollection} for writing to BigQuery. Use the table function to determine + * which tables each element is written to, and format the element into a {@link TableRow} using the + * user-supplied format function. */ -public class PrepareWrite<T> extends DoFn<T, KV<TableDestination, TableRow>> { +public class PrepareWrite<T> + extends PTransform<PCollection<T>, PCollection<KV<TableDestination, TableRow>>> { private SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction; private SerializableFunction<T, TableRow> formatFunction; - public PrepareWrite(SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction, - SerializableFunction<T, TableRow> formatFunction) { + public PrepareWrite( + SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction, + SerializableFunction<T, TableRow> formatFunction) { this.tableFunction = tableFunction; this.formatFunction = formatFunction; } - @ProcessElement - public void processElement(ProcessContext context, BoundedWindow window) throws IOException { - TableDestination tableDestination = tableSpecFromWindowedValue( - context.getPipelineOptions().as(BigQueryOptions.class), - ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane())); - TableRow tableRow = formatFunction.apply(context.element()); - context.output(KV.of(tableDestination, tableRow)); + @Override + public PCollection<KV<TableDestination, TableRow>> expand(PCollection<T> input) { + return input.apply( + ParDo.of( + new DoFn<T, KV<TableDestination, TableRow>>() { + @ProcessElement + public void processElement(ProcessContext context, BoundedWindow window) + throws IOException { + TableDestination tableDestination = + tableSpecFromWindowedValue( + context.getPipelineOptions().as(BigQueryOptions.class), + ValueInSingleWindow.of( + context.element(), context.timestamp(), window, context.pane())); + TableRow tableRow = formatFunction.apply(context.element()); + context.output(KV.of(tableDestination, tableRow)); + } + })); } - private TableDestination tableSpecFromWindowedValue(BigQueryOptions options, - ValueInSingleWindow<T> value) { + private TableDestination tableSpecFromWindowedValue( + BigQueryOptions options, ValueInSingleWindow<T> value) { TableDestination tableDestination = tableFunction.apply(value); TableReference tableReference = tableDestination.getTableReference(); if (Strings.isNullOrEmpty(tableReference.getProjectId())) { tableReference.setProjectId(options.getProject()); - tableDestination = new TableDestination(tableReference, - tableDestination.getTableDescription()); + tableDestination = + new TableDestination(tableReference, tableDestination.getTableDescription()); } return tableDestination; } http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java index 09b4fbf..c2b739f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java @@ -25,6 +25,7 @@ import java.util.Objects; * A key and a shard number. */ class ShardedKey<K> implements Serializable { + private static final long serialVersionUID = 1L; private final K key; private final int shardNumber; http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java index ced1d66..efd9c31 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java @@ -22,15 +22,10 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -38,8 +33,8 @@ import org.apache.beam.sdk.values.PCollection; * PTransform that performs streaming BigQuery write. To increase consistency, * it leverages BigQuery best effort de-dup mechanism. */ -class StreamingInserts extends PTransform<PCollection<KV<TableDestination, TableRow>>, - WriteResult> { +public class StreamingInserts extends + PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> { private final Write<?> write; private static class ConstantSchemaFunction implements @@ -74,36 +69,11 @@ class StreamingInserts extends PTransform<PCollection<KV<TableDestination, Table SerializableFunction<TableDestination, TableSchema> schemaFunction = new ConstantSchemaFunction(write.getSchema()); - // A naive implementation would be to simply stream data directly to BigQuery. - // However, this could occasionally lead to duplicated data, e.g., when - // a VM that runs this code is restarted and the code is re-run. + PCollection<KV<TableDestination, TableRow>> writes = input + .apply("CreateTables", new CreateTables(write.getCreateDisposition(), schemaFunction) + .withTestServices(write.getBigQueryServices())); - // The above risk is mitigated in this implementation by relying on - // BigQuery built-in best effort de-dup mechanism. - - // To use this mechanism, each input TableRow is tagged with a generated - // unique id, which is then passed to BigQuery and used to ignore duplicates. - PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = input - .apply("CreateTables", ParDo.of(new CreateTables(write.getCreateDisposition(), - write.getBigQueryServices(), schemaFunction))) - // We create 50 keys per BigQuery table to generate output on. This is few enough that we - // get good batching into BigQuery's insert calls, and enough that we can max out the - // streaming insert quota. - .apply("ShardTableWrites", ParDo.of(new GenerateShardedTable(50))) - .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowJsonCoder.of())) - .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds())); - - // To prevent having the same TableRow processed more than once with regenerated - // different unique ids, this implementation relies on "checkpointing", which is - // achieved as a side effect of having StreamingWriteFn immediately follow a GBK, - // performed by Reshuffle. - tagged - .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) - .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of()) - .apply("StreamingWrite", - ParDo.of( - new StreamingWriteFn(write.getBigQueryServices()))); - - return WriteResult.in(input.getPipeline()); + return writes.apply(new StreamingWriteTables() + .withTestServices(write.getBigQueryServices())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java new file mode 100644 index 0000000..4ddc1df --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.TableRowJsonCoder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.Reshuffle; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * This transform takes in key-value pairs of {@link TableRow} entries and the + * {@link TableDestination} it should be written to. The BigQuery streaming-write service is used + * to stream these writes to the appropriate table. + * + * <p>This transform assumes that all destination tables already exist by the time it sees a write + * for that table. + */ +public class StreamingWriteTables extends PTransform< + PCollection<KV<TableDestination, TableRow>>, WriteResult> { + private BigQueryServices bigQueryServices; + + public StreamingWriteTables() { + this(new BigQueryServicesImpl()); + } + + private StreamingWriteTables(BigQueryServices bigQueryServices) { + this.bigQueryServices = bigQueryServices; + } + + StreamingWriteTables withTestServices(BigQueryServices bigQueryServices) { + return new StreamingWriteTables(bigQueryServices); + } + + @Override + public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) { + // A naive implementation would be to simply stream data directly to BigQuery. + // However, this could occasionally lead to duplicated data, e.g., when + // a VM that runs this code is restarted and the code is re-run. + + // The above risk is mitigated in this implementation by relying on + // BigQuery built-in best effort de-dup mechanism. + + // To use this mechanism, each input TableRow is tagged with a generated + // unique id, which is then passed to BigQuery and used to ignore duplicates + // We create 50 keys per BigQuery table to generate output on. This is few enough that we + // get good batching into BigQuery's insert calls, and enough that we can max out the + // streaming insert quota. + PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = + input.apply("ShardTableWrites", ParDo.of + (new GenerateShardedTable(50))) + .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowJsonCoder.of())) + .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds())); + + // To prevent having the same TableRow processed more than once with regenerated + // different unique ids, this implementation relies on "checkpointing", which is + // achieved as a side effect of having StreamingWriteFn immediately follow a GBK, + // performed by Reshuffle. + tagged + .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) + .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of()) + .apply("StreamingWrite", + ParDo.of( + new StreamingWriteFn(bigQueryServices))); + return WriteResult.in(input.getPipeline()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index 36e1401..962e2cd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -27,6 +27,7 @@ import java.util.Objects; * Encapsulates a BigQuery table destination. */ public class TableDestination implements Serializable { + private static final long serialVersionUID = 1L; private final String tableSpec; private final String tableDescription; http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java index fa24700..262a00d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -1,20 +1,20 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.bigquery; @@ -26,20 +26,18 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; -/** - * A coder for {@link TableDestination} objects. - */ +/** A coder for {@link TableDestination} objects. */ public class TableDestinationCoder extends AtomicCoder<TableDestination> { private static final TableDestinationCoder INSTANCE = new TableDestinationCoder(); - + private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); @JsonCreator public static TableDestinationCoder of() { - return INSTANCE; - } + return INSTANCE; + } @Override - public void encode(TableDestination value, OutputStream outStream, Context context) + public void encode(TableDestination value, OutputStream outStream, Context context) throws IOException { if (value == null) { throw new CoderException("cannot encode a null value"); @@ -50,15 +48,13 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> { @Override public TableDestination decode(InputStream inStream, Context context) throws IOException { - return new TableDestination( - stringCoder.decode(inStream, context.nested()), - stringCoder.decode(inStream, context.nested())); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - return; - } + return new TableDestination( + stringCoder.decode(inStream, context.nested()), + stringCoder.decode(inStream, context.nested())); + } - StringUtf8Coder stringCoder = StringUtf8Coder.of(); + @Override + public void verifyDeterministic() throws NonDeterministicException { + return; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java index ee8f466..91ef404 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java @@ -32,9 +32,7 @@ import org.apache.beam.sdk.util.MimeTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Writes {@TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. - */ +/** Writes {@TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. */ class TableRowWriter { private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class); @@ -47,16 +45,18 @@ class TableRowWriter { protected String mimeType = MimeTypes.TEXT; private CountingOutputStream out; - public class Result { - String filename; - long byteSize; + public static final class Result { + final String filename; + final long byteSize; + public Result(String filename, long byteSize) { this.filename = filename; this.byteSize = byteSize; } } + TableRowWriter(String basename) { - this.tempFilePrefix = basename; + this.tempFilePrefix = basename; } public final void open(String uId) throws Exception { http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java index 7379784..284691e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java @@ -28,15 +28,14 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; /** - * Fn that tags each table row with a unique id and destination table. - * To avoid calling UUID.randomUUID() for each element, which can be costly, - * a randomUUID is generated only once per bucket of data. The actual unique - * id is created by concatenating this randomUUID with a sequential number. + * Fn that tags each table row with a unique id and destination table. To avoid calling + * UUID.randomUUID() for each element, which can be costly, a randomUUID is generated only once per + * bucket of data. The actual unique id is created by concatenating this randomUUID with a + * sequential number. */ @VisibleForTesting class TagWithUniqueIds extends DoFn<KV<ShardedKey<String>, TableRow>, KV<ShardedKey<String>, TableRowInfo>> { - private transient String randomUUID; private transient long sequenceNo = 0L; @@ -51,8 +50,9 @@ class TagWithUniqueIds String uniqueId = randomUUID + sequenceNo++; // We output on keys 0-50 to ensure that there's enough batching for // BigQuery. - context.output(KV.of(context.element().getKey(), - new TableRowInfo(context.element().getValue(), uniqueId))); + context.output( + KV.of( + context.element().getKey(), new TableRowInfo(context.element().getValue(), uniqueId))); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 869e68a..a25cc90 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -51,10 +51,11 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file, * and encapsulates the table it is destined to as well as the file byte size. */ - public static class Result implements Serializable { - public String filename; - public Long fileByteSize; - public TableDestination tableDestination; + public static final class Result implements Serializable { + private static final long serialVersionUID = 1L; + public final String filename; + public final Long fileByteSize; + public final TableDestination tableDestination; public Result(String filename, Long fileByteSize, TableDestination tableDestination) { this.filename = filename; @@ -68,6 +69,9 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund */ public static class ResultCoder extends AtomicCoder<Result> { private static final ResultCoder INSTANCE = new ResultCoder(); + private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + private static final VarLongCoder longCoder = VarLongCoder.of(); + private static final TableDestinationCoder tableDestinationCoder = TableDestinationCoder.of(); public static ResultCoder of() { return INSTANCE; @@ -87,18 +91,15 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund @Override public Result decode(InputStream inStream, Context context) throws IOException { - return new Result(stringCoder.decode(inStream, context.nested()), - longCoder.decode(inStream, context.nested()), - tableDestinationCoder.decode(inStream, context.nested())); + String filename = stringCoder.decode(inStream, context.nested()); + long fileByteSize = longCoder.decode(inStream, context.nested()); + TableDestination tableDestination = tableDestinationCoder.decode(inStream, context.nested()); + return new Result(filename, fileByteSize, tableDestination); } @Override public void verifyDeterministic() throws NonDeterministicException { } - - StringUtf8Coder stringCoder = StringUtf8Coder.of(); - VarLongCoder longCoder = VarLongCoder.of(); - TableDestinationCoder tableDestinationCoder = TableDestinationCoder.of(); } WriteBundlesToFiles(String tempFilePrefix) { @@ -107,6 +108,8 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund @StartBundle public void startBundle(Context c) { + // This must be done each bundle, as by default the {@link DoFn} might be reused between + // bundles. this.writers = Maps.newHashMap(); } http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 9c48b82..9414909 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -44,7 +44,65 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List< private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag; private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag; - public WritePartition( + private static class PartitionData { + private int numFiles = 0; + private long byteSize = 0; + private List<String> filenames = Lists.newArrayList(); + + int getNumFiles() { + return numFiles; + } + + void addFiles(int numFiles) { + this.numFiles += numFiles; + } + + long getByteSize() { + return byteSize; + } + + void addBytes(long numBytes) { + this.byteSize += numBytes; + } + + List<String> getFilenames() { + return filenames; + } + + void addFilename(String filename) { + filenames.add(filename); + } + + // Check to see whether we can add to this partition without exceeding the maximum partition + // size. + boolean canAccept(int numFiles, long numBytes) { + return this.numFiles + numFiles <= Write.MAX_NUM_FILES + && this.byteSize + numBytes <= Write.MAX_SIZE_BYTES; + } + } + + private static class DestinationData { + private List<PartitionData> partitions = Lists.newArrayList(); + + DestinationData() { + // Always start out with a single empty partition. + partitions.add(new PartitionData()); + } + + List<PartitionData> getPartitions() { + return partitions; + } + + PartitionData getLatestPartition() { + return partitions.get(partitions.size() - 1); + } + + void addPartition(PartitionData partition) { + partitions.add(partition); + } + } + + WritePartition( ValueProvider<String> singletonOutputJsonTableRef, String singletonOutputTableDescription, PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView, @@ -76,54 +134,41 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List< } - long partitionId = 0; - Map<TableDestination, Integer> currNumFilesMap = Maps.newHashMap(); - Map<TableDestination, Long> currSizeBytesMap = Maps.newHashMap(); - Map<TableDestination, List<List<String>>> currResultsMap = Maps.newHashMap(); - for (int i = 0; i < results.size(); ++i) { - WriteBundlesToFiles.Result fileResult = results.get(i); + Map<TableDestination, DestinationData> currentResults = Maps.newHashMap(); + for (WriteBundlesToFiles.Result fileResult : results) { TableDestination tableDestination = fileResult.tableDestination; - List<List<String>> partitions = currResultsMap.get(tableDestination); - if (partitions == null) { - partitions = Lists.newArrayList(); - partitions.add(Lists.<String>newArrayList()); - currResultsMap.put(tableDestination, partitions); + DestinationData destinationData = currentResults.get(tableDestination); + if (destinationData == null) { + destinationData = new DestinationData(); + currentResults.put(tableDestination, destinationData); } - int currNumFiles = getOrDefault(currNumFilesMap, tableDestination, 0); - long currSizeBytes = getOrDefault(currSizeBytesMap, tableDestination, 0L); - if (currNumFiles + 1 > Write.MAX_NUM_FILES - || currSizeBytes + fileResult.fileByteSize > Write.MAX_SIZE_BYTES) { - // Add a new partition for this table. - partitions.add(Lists.<String>newArrayList()); - // c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); - currNumFiles = 0; - currSizeBytes = 0; - currNumFilesMap.remove(tableDestination); - currSizeBytesMap.remove(tableDestination); + + PartitionData latestPartition = destinationData.getLatestPartition(); + if (!latestPartition.canAccept(1, fileResult.fileByteSize)) { + // Too much data, roll over to a new partition. + latestPartition = new PartitionData(); + destinationData.addPartition(latestPartition); } - currNumFilesMap.put(tableDestination, currNumFiles + 1); - currSizeBytesMap.put(tableDestination, currSizeBytes + fileResult.fileByteSize); - // Always add to the most recent partition for this table. - partitions.get(partitions.size() - 1).add(fileResult.filename); + latestPartition.addFilename(fileResult.filename); + latestPartition.addFiles(1); + latestPartition.addBytes(fileResult.fileByteSize); } - for (Map.Entry<TableDestination, List<List<String>>> entry : currResultsMap.entrySet()) { + // Now that we've figured out which tables and partitions to write out, emit this information + // to the next stage. + for (Map.Entry<TableDestination, DestinationData> entry : currentResults.entrySet()) { TableDestination tableDestination = entry.getKey(); - List<List<String>> partitions = entry.getValue(); + DestinationData destinationData = entry.getValue(); + // In the fast-path case where we only output one table, the transform loads it directly + // to the final table. In this case, we output on a special TupleTag so the enclosing + // transform knows to skip the rename step. TupleTag<KV<ShardedKey<TableDestination>, List<String>>> outputTag = - (partitions.size() == 1) ? singlePartitionTag : multiPartitionsTag; - for (int i = 0; i < partitions.size(); ++i) { - c.output(outputTag, KV.of(ShardedKey.of(tableDestination, i + 1), partitions.get(i))); + (destinationData.getPartitions().size() == 1) ? singlePartitionTag : multiPartitionsTag; + for (int i = 0; i < destinationData.getPartitions().size(); ++i) { + PartitionData partitionData = destinationData.getPartitions().get(i); + c.output(outputTag, KV.of(ShardedKey.of(tableDestination, i + 1), + partitionData.getFilenames())); } } } - - private <T> T getOrDefault(Map<TableDestination, T> map, TableDestination tableDestination, - T defaultValue) { - if (map.containsKey(tableDestination)) { - return map.get(tableDestination); - } else { - return defaultValue; - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 752e7d3..9b1c989 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -89,8 +89,9 @@ class WriteRename extends DoFn<String, Void> { } // Make sure each destination table gets a unique job id. - String jobIdPrefix = String.format( - c.sideInput(jobIdToken) + "0x%08x", finalTableDestination.hashCode()); + String jobIdPrefix = BigQueryHelpers.createJobId( + c.sideInput(jobIdToken), finalTableDestination, -1); + copy( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index f7fe87b..4a6cd2b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -57,11 +57,15 @@ import org.slf4j.LoggerFactory; /** * Writes partitions to BigQuery tables. * - * <p>The input is a list of files corresponding to a partition of a table. These files are + * <p>The input is a list of files corresponding to each partition of a table. These files are * load into a temporary table (or into the final table if there is only one partition). The output - * is a {@link KV} mapping the final table to the temporary tables for each partition of that table. + * is a {@link KV} mapping each final table to a list of the temporary tables containing its data. + * + * <p>In the case where all the data in the files fit into a single load job, this transform loads + * the data directly into the final table, skipping temporary tables. In this case, the output + * {@link KV} maps the final table to itself. */ -class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, Iterable<List<String>>>, +class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, List<String>>, KV<TableDestination, String>> { private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class); @@ -94,10 +98,9 @@ class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, Iterable<List<St public void processElement(ProcessContext c) throws Exception { TableDestination tableDestination = c.element().getKey().getKey(); Integer partition = c.element().getKey().getShardNumber(); - List<String> partitionFiles = Lists.newArrayList(c.element().getValue()).get(0); - // Job ID must be different for each partition of each table. - String jobIdPrefix = String.format( - c.sideInput(jobIdToken) + "_0x%08x_%05d", tableDestination.hashCode(), partition); + List<String> partitionFiles = Lists.newArrayList(c.element().getValue()); + String jobIdPrefix = BigQueryHelpers.createJobId( + c.sideInput(jobIdToken), tableDestination, partition); TableReference ref = tableDestination.getTableReference(); if (!singlePartition) { http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index f10be13..d0004e4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -71,6 +71,7 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TableRowJsonCoder; @@ -122,7 +123,6 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; @@ -607,13 +607,11 @@ public class BigQueryIOTest implements Serializable { } @Test - @Category(NeedsRunner.class) public void testStreamingWriteWithDynamicTables() throws Exception { testWriteWithDynamicTables(true); } @Test - @Category(NeedsRunner.class) public void testBatchWriteWithDynamicTables() throws Exception { testWriteWithDynamicTables(false); } @@ -842,7 +840,7 @@ public class BigQueryIOTest implements Serializable { BigQueryIO.writeTableRows().to("foo.com:project:somedataset.sometable"); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, ""); } @Test @@ -894,7 +892,7 @@ public class BigQueryIOTest implements Serializable { null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, - null, + "", false); } @@ -905,7 +903,7 @@ public class BigQueryIOTest implements Serializable { checkWriteObject( write, null, "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, - null); + ""); } @Test @@ -917,7 +915,7 @@ public class BigQueryIOTest implements Serializable { BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows().to(table); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, ""); } @Test @@ -927,7 +925,7 @@ public class BigQueryIOTest implements Serializable { BigQueryIO.<TableRow>write().to("foo.com:project:somedataset.sometable").withSchema(schema); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); + schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, ""); } @Test @@ -937,7 +935,7 @@ public class BigQueryIOTest implements Serializable { .withCreateDisposition(CreateDisposition.CREATE_NEVER); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, null); + null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, ""); } @Test @@ -947,7 +945,7 @@ public class BigQueryIOTest implements Serializable { .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, ""); } @Test @@ -957,7 +955,7 @@ public class BigQueryIOTest implements Serializable { .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, null); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, ""); } @Test @@ -967,7 +965,7 @@ public class BigQueryIOTest implements Serializable { .withWriteDisposition(WriteDisposition.WRITE_APPEND); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, null); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, ""); } @Test @@ -977,7 +975,7 @@ public class BigQueryIOTest implements Serializable { .withWriteDisposition(WriteDisposition.WRITE_EMPTY); checkWriteObject( write, "foo.com:project", "somedataset", "sometable", - null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null); + null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, ""); } @Test @@ -1359,7 +1357,6 @@ public class BigQueryIOTest implements Serializable { SourceTestUtils.assertSplitAtFractionBehavior( bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); - List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options); assertEquals(2, sources.size()); BoundedSource<TableRow> actual = sources.get(0); @@ -1626,9 +1623,11 @@ public class BigQueryIOTest implements Serializable { TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag = new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {}; + PCollection<WriteBundlesToFiles.Result> filesPCollection = + p.apply(Create.of(files).withType(new TypeDescriptor<WriteBundlesToFiles.Result>() {})); PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = PCollectionViews.iterableView( - p, + filesPCollection, WindowingStrategy.globalDefault(), WriteBundlesToFiles.ResultCoder.of()); @@ -1699,14 +1698,12 @@ public class BigQueryIOTest implements Serializable { Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables"); - List<KV<ShardedKey<TableDestination>, Iterable<List<String>>>> partitions = - Lists.newArrayList(); + List<KV<ShardedKey<TableDestination>, List<String>>> partitions = Lists.newArrayList(); for (int i = 0; i < numTables; ++i) { String tableName = String.format("project-id:dataset-id.table%05d", i); TableDestination tableDestination = new TableDestination(tableName, tableName); for (int j = 0; j < numPartitions; ++j) { - String tempTableId = String.format( - jobIdToken + "_0x%08x_%05d", tableDestination.hashCode(), j); + String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j); List<String> filesPerPartition = Lists.newArrayList(); for (int k = 0; k < numFilesPerPartition; ++k) { String filename = Paths.get(baseDir.toString(), @@ -1721,7 +1718,7 @@ public class BigQueryIOTest implements Serializable { filesPerPartition.add(filename); } partitions.add(KV.of(ShardedKey.of(tableDestination, j), - (Iterable<List<String>>) Collections.singleton(filesPerPartition))); + filesPerPartition)); List<String> expectedTables = expectedTempTables.get(tableDestination); if (expectedTables == null) { @@ -1735,11 +1732,6 @@ public class BigQueryIOTest implements Serializable { } } - PCollection<String> expectedTempTablesPCollection = p.apply(Create.of(expectedTempTables)); - PCollectionView<Iterable<String>> tempTablesView = PCollectionViews.iterableView( - expectedTempTablesPCollection, - WindowingStrategy.globalDefault(), - StringUtf8Coder.of()); PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId")); PCollectionView<String> jobIdTokenView = jobIdTokenCollection.apply(View.<String>asSingleton()); @@ -1753,10 +1745,10 @@ public class BigQueryIOTest implements Serializable { CreateDisposition.CREATE_IF_NEEDED, null); - DoFnTester<KV<ShardedKey<TableDestination>, Iterable<List<String>>>, + DoFnTester<KV<ShardedKey<TableDestination>, List<String>>, KV<TableDestination, String>> tester = DoFnTester.of(writeTables); tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); - for (KV<ShardedKey<TableDestination>, Iterable<List<String>>> partition : partitions) { + for (KV<ShardedKey<TableDestination>, List<String>> partition : partitions) { tester.processElement(partition); } @@ -1848,11 +1840,27 @@ public class BigQueryIOTest implements Serializable { } } + PCollection<KV<TableDestination, String>> tempTablesPCollection = + p.apply(Create.of(tempTables) + .withCoder(KvCoder.of(TableDestinationCoder.of(), + IterableCoder.of(StringUtf8Coder.of())))) + .apply(ParDo.of(new DoFn<KV<TableDestination, Iterable<String>>, + KV<TableDestination, String>>() { + @ProcessElement + public void processElement(ProcessContext c) { + TableDestination tableDestination = c.element().getKey(); + for (String tempTable : c.element().getValue()) { + c.output(KV.of(tableDestination, tempTable)); + } + } + })); + PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = PCollectionViews.multimapView( - p, + tempTablesPCollection, WindowingStrategy.globalDefault(), - KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())); + KvCoder.of(TableDestinationCoder.of(), + StringUtf8Coder.of())); PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId")); PCollectionView<String> jobIdTokenView =