Repository: beam Updated Branches: refs/heads/master 17f0843eb -> a1d82c203
http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java index 73d8eb7..1e451cc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java @@ -17,9 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; -import com.google.common.base.Strings; import java.io.IOException; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -35,46 +33,34 @@ import org.apache.beam.sdk.values.ValueInSingleWindow; * which tables each element is written to, and format the element into a {@link TableRow} using the * user-supplied format function. */ -public class PrepareWrite<T> - extends PTransform<PCollection<T>, PCollection<KV<TableDestination, TableRow>>> { - private SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction; +public class PrepareWrite<T, DestinationT> + extends PTransform<PCollection<T>, PCollection<KV<DestinationT, TableRow>>> { + private DynamicDestinations<T, DestinationT> dynamicDestinations; private SerializableFunction<T, TableRow> formatFunction; public PrepareWrite( - SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction, + DynamicDestinations<T, DestinationT> dynamicDestinations, SerializableFunction<T, TableRow> formatFunction) { - this.tableFunction = tableFunction; + this.dynamicDestinations = dynamicDestinations; this.formatFunction = formatFunction; } @Override - public PCollection<KV<TableDestination, TableRow>> expand(PCollection<T> input) { + public PCollection<KV<DestinationT, TableRow>> expand(PCollection<T> input) { return input.apply( ParDo.of( - new DoFn<T, KV<TableDestination, TableRow>>() { + new DoFn<T, KV<DestinationT, TableRow>>() { @ProcessElement public void processElement(ProcessContext context, BoundedWindow window) throws IOException { - TableDestination tableDestination = - tableSpecFromWindowedValue( - context.getPipelineOptions().as(BigQueryOptions.class), + dynamicDestinations.setSideInputAccessorFromProcessContext(context); + DestinationT tableDestination = + dynamicDestinations.getDestination( ValueInSingleWindow.of( context.element(), context.timestamp(), window, context.pane())); TableRow tableRow = formatFunction.apply(context.element()); context.output(KV.of(tableDestination, tableRow)); } - })); - } - - private TableDestination tableSpecFromWindowedValue( - BigQueryOptions options, ValueInSingleWindow<T> value) { - TableDestination tableDestination = tableFunction.apply(value); - TableReference tableReference = tableDestination.getTableReference(); - if (Strings.isNullOrEmpty(tableReference.getProjectId())) { - tableReference.setProjectId(options.getProject()); - tableDestination = - new TableDestination(tableReference, tableDestination.getTableDescription()); - } - return tableDestination; + }).withSideInputs(dynamicDestinations.getSideInputs())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java index efd9c31..9cb0027 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java @@ -19,42 +19,33 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; /** -* PTransform that performs streaming BigQuery write. To increase consistency, -* it leverages BigQuery best effort de-dup mechanism. + * PTransform that performs streaming BigQuery write. To increase consistency, it leverages + * BigQuery's best effort de-dup mechanism. */ -public class StreamingInserts extends - PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> { - private final Write<?> write; +public class StreamingInserts<DestinationT> + extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> { + private BigQueryServices bigQueryServices; + private final CreateDisposition createDisposition; + private final DynamicDestinations<?, DestinationT> dynamicDestinations; - private static class ConstantSchemaFunction implements - SerializableFunction<TableDestination, TableSchema> { - private final @Nullable String jsonSchema; - - ConstantSchemaFunction(TableSchema schema) { - this.jsonSchema = BigQueryHelpers.toJsonString(schema); - } - - @Override - @Nullable - public TableSchema apply(TableDestination table) { - return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class); - } + /** Constructor. */ + StreamingInserts(CreateDisposition createDisposition, + DynamicDestinations<?, DestinationT> dynamicDestinations) { + this.createDisposition = createDisposition; + this.dynamicDestinations = dynamicDestinations; + this.bigQueryServices = new BigQueryServicesImpl(); } - /** Constructor. */ - StreamingInserts(Write<?> write) { - this.write = write; + void setTestServices(BigQueryServices bigQueryServices) { + this.bigQueryServices = bigQueryServices; } @Override @@ -63,17 +54,13 @@ public class StreamingInserts extends } @Override - public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) { - // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant - // schema function here. If no schema is specified, this function will return null. - SerializableFunction<TableDestination, TableSchema> schemaFunction = - new ConstantSchemaFunction(write.getSchema()); - - PCollection<KV<TableDestination, TableRow>> writes = input - .apply("CreateTables", new CreateTables(write.getCreateDisposition(), schemaFunction) - .withTestServices(write.getBigQueryServices())); + public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) { + PCollection<KV<TableDestination, TableRow>> writes = + input.apply( + "CreateTables", + new CreateTables<DestinationT>(createDisposition, dynamicDestinations) + .withTestServices(bigQueryServices)); - return writes.apply(new StreamingWriteTables() - .withTestServices(write.getBigQueryServices())); + return writes.apply(new StreamingWriteTables().withTestServices(bigQueryServices)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java index 4d130b6..20b47e1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java @@ -22,6 +22,9 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -77,6 +80,11 @@ public class StreamingWriteTables extends PTransform< tagged .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of()) + // Put in the global window to ensure that DynamicDestinations side inputs are accessed + // correctly. + .apply("GlobalWindow", + Window.<KV<ShardedKey<String>, TableRowInfo>>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()).discardingFiredPanes()) .apply("StreamingWrite", ParDo.of( new StreamingWriteFn(bigQueryServices))); http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java index c418804..8a06d13 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -41,7 +41,7 @@ public class TableDestinationCoder extends CustomCoder<TableDestination> { throw new CoderException("cannot encode a null value"); } stringCoder.encode(value.getTableSpec(), outStream, context.nested()); - stringCoder.encode(value.getTableDescription(), outStream, context); + stringCoder.encode(value.getTableDescription(), outStream, context.nested()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 5f89067..4f609b2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -26,6 +26,7 @@ import java.io.OutputStream; import java.io.Serializable; import java.util.Map; import java.util.UUID; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -37,64 +38,66 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Writes each bundle of {@link TableRow} elements out to a separate file using - * {@link TableRowWriter}. + * Writes each bundle of {@link TableRow} elements out to a separate file using {@link + * TableRowWriter}. */ -class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBundlesToFiles.Result> { +class WriteBundlesToFiles<DestinationT> + extends DoFn<KV<DestinationT, TableRow>, WriteBundlesToFiles.Result<DestinationT>> { private static final Logger LOG = LoggerFactory.getLogger(WriteBundlesToFiles.class); // Map from tablespec to a writer for that table. - private transient Map<TableDestination, TableRowWriter> writers; + private transient Map<DestinationT, TableRowWriter> writers; private final String tempFilePrefix; /** * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file, * and encapsulates the table it is destined to as well as the file byte size. */ - public static final class Result implements Serializable { + public static final class Result<DestinationT> implements Serializable { private static final long serialVersionUID = 1L; public final String filename; public final Long fileByteSize; - public final TableDestination tableDestination; + public final DestinationT destination; - public Result(String filename, Long fileByteSize, TableDestination tableDestination) { + public Result(String filename, Long fileByteSize, DestinationT destination) { this.filename = filename; this.fileByteSize = fileByteSize; - this.tableDestination = tableDestination; + this.destination = destination; } } - /** - * a coder for the {@link Result} class. - */ - public static class ResultCoder extends CustomCoder<Result> { - private static final ResultCoder INSTANCE = new ResultCoder(); + /** a coder for the {@link Result} class. */ + public static class ResultCoder<DestinationT> extends CustomCoder<Result<DestinationT>> { private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); private static final VarLongCoder longCoder = VarLongCoder.of(); - private static final TableDestinationCoder tableDestinationCoder = TableDestinationCoder.of(); + private final Coder<DestinationT> destinationCoder; - public static ResultCoder of() { - return INSTANCE; + public static <DestinationT> ResultCoder<DestinationT> of( + Coder<DestinationT> destinationCoder) { + return new ResultCoder<>(destinationCoder); + } + + ResultCoder(Coder<DestinationT> destinationCoder) { + this.destinationCoder = destinationCoder; } @Override - public void encode(Result value, OutputStream outStream, Context context) + public void encode(Result<DestinationT> value, OutputStream outStream, Context context) throws IOException { if (value == null) { throw new CoderException("cannot encode a null value"); } stringCoder.encode(value.filename, outStream, context.nested()); longCoder.encode(value.fileByteSize, outStream, context.nested()); - tableDestinationCoder.encode(value.tableDestination, outStream, context.nested()); + destinationCoder.encode(value.destination, outStream, context.nested()); } @Override - public Result decode(InputStream inStream, Context context) - throws IOException { + public Result<DestinationT> decode(InputStream inStream, Context context) throws IOException { String filename = stringCoder.decode(inStream, context.nested()); long fileByteSize = longCoder.decode(inStream, context.nested()); - TableDestination tableDestination = tableDestinationCoder.decode(inStream, context.nested()); - return new Result(filename, fileByteSize, tableDestination); + DestinationT destination = destinationCoder.decode(inStream, context.nested()); + return new Result<>(filename, fileByteSize, destination); } @Override @@ -138,9 +141,9 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund @FinishBundle public void finishBundle(Context c) throws Exception { - for (Map.Entry<TableDestination, TableRowWriter> entry : writers.entrySet()) { + for (Map.Entry<DestinationT, TableRowWriter> entry : writers.entrySet()) { TableRowWriter.Result result = entry.getValue().close(); - c.output(new Result(result.resourceId.toString(), result.byteSize, entry.getKey())); + c.output(new Result<>(result.resourceId.toString(), result.byteSize, entry.getKey())); } writers.clear(); } @@ -149,8 +152,7 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) - .withLabel("Temporary File Prefix")); + builder.addIfNotNull( + DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 0ae1768..4136fa0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; -import com.google.api.services.bigquery.model.TableReference; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.List; @@ -27,7 +26,6 @@ import java.util.UUID; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; @@ -37,12 +35,12 @@ import org.apache.beam.sdk.values.TupleTag; * Partitions temporary files based on number of files and file sizes. Output key is a pair of * tablespec and the list of files corresponding to each partition of that table. */ -class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List<String>>> { - private final ValueProvider<String> singletonOutputJsonTableRef; - private final String singletonOutputTableDescription; - private final PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView; - private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag; - private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag; +class WritePartition<DestinationT> + extends DoFn<String, KV<ShardedKey<DestinationT>, List<String>>> { + private final boolean singletonTable; + private final PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView; + private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag; + private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag; private static class PartitionData { private int numFiles = 0; @@ -66,7 +64,7 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List< } List<String> getFilenames() { - return filenames; + return filenames; } void addFilename(String filename) { @@ -98,18 +96,16 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List< } void addPartition(PartitionData partition) { - partitions.add(partition); + partitions.add(partition); } } WritePartition( - ValueProvider<String> singletonOutputJsonTableRef, - String singletonOutputTableDescription, - PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView, - TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag, - TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag) { - this.singletonOutputJsonTableRef = singletonOutputJsonTableRef; - this.singletonOutputTableDescription = singletonOutputTableDescription; + boolean singletonTable, + PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView, + TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag, + TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag) { + this.singletonTable = singletonTable; this.resultsView = resultsView; this.multiPartitionsTag = multiPartitionsTag; this.singlePartitionTag = singlePartitionTag; @@ -117,30 +113,31 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List< @ProcessElement public void processElement(ProcessContext c) throws Exception { - List<WriteBundlesToFiles.Result> results = Lists.newArrayList(c.sideInput(resultsView)); + List<WriteBundlesToFiles.Result<DestinationT>> results = + Lists.newArrayList(c.sideInput(resultsView)); // If there are no elements to write _and_ the user specified a constant output table, then // generate an empty table of that name. - if (results.isEmpty() && singletonOutputJsonTableRef != null) { - TableReference singletonTable = BigQueryHelpers.fromJsonString( - singletonOutputJsonTableRef.get(), TableReference.class); - if (singletonTable != null) { + if (results.isEmpty() && singletonTable) { TableRowWriter writer = new TableRowWriter(c.element()); writer.open(UUID.randomUUID().toString()); TableRowWriter.Result writerResult = writer.close(); - results.add(new Result(writerResult.resourceId.toString(), writerResult.byteSize, - new TableDestination(singletonTable, singletonOutputTableDescription))); - } + // Return a null destination in this case - the constant DynamicDestinations class will + // resolve it to the singleton output table. + results.add( + new Result<DestinationT>( + writerResult.resourceId.toString(), + writerResult.byteSize, + null)); } - - Map<TableDestination, DestinationData> currentResults = Maps.newHashMap(); - for (WriteBundlesToFiles.Result fileResult : results) { - TableDestination tableDestination = fileResult.tableDestination; - DestinationData destinationData = currentResults.get(tableDestination); + Map<DestinationT, DestinationData> currentResults = Maps.newHashMap(); + for (WriteBundlesToFiles.Result<DestinationT> fileResult : results) { + DestinationT destination = fileResult.destination; + DestinationData destinationData = currentResults.get(destination); if (destinationData == null) { destinationData = new DestinationData(); - currentResults.put(tableDestination, destinationData); + currentResults.put(destination, destinationData); } PartitionData latestPartition = destinationData.getLatestPartition(); @@ -156,18 +153,18 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List< // Now that we've figured out which tables and partitions to write out, emit this information // to the next stage. - for (Map.Entry<TableDestination, DestinationData> entry : currentResults.entrySet()) { - TableDestination tableDestination = entry.getKey(); + for (Map.Entry<DestinationT, DestinationData> entry : currentResults.entrySet()) { + DestinationT destination = entry.getKey(); DestinationData destinationData = entry.getValue(); // In the fast-path case where we only output one table, the transform loads it directly // to the final table. In this case, we output on a special TupleTag so the enclosing // transform knows to skip the rename step. - TupleTag<KV<ShardedKey<TableDestination>, List<String>>> outputTag = + TupleTag<KV<ShardedKey<DestinationT>, List<String>>> outputTag = (destinationData.getPartitions().size() == 1) ? singlePartitionTag : multiPartitionsTag; for (int i = 0; i < destinationData.getPartitions().size(); ++i) { PartitionData partitionData = destinationData.getPartitions().get(i); - c.output(outputTag, KV.of(ShardedKey.of(tableDestination, i + 1), - partitionData.getFilenames())); + c.output( + outputTag, KV.of(ShardedKey.of(destination, i + 1), partitionData.getFilenames())); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index f336849..b299244 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -23,12 +23,14 @@ import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collection; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; @@ -39,7 +41,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.FileIOChannelFactory; import org.apache.beam.sdk.util.GcsIOChannelFactory; @@ -52,72 +53,86 @@ import org.apache.beam.sdk.values.PCollectionView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Writes partitions to BigQuery tables. * - * <p>The input is a list of files corresponding to each partition of a table. These files are - * load into a temporary table (or into the final table if there is only one partition). The output - * is a {@link KV} mapping each final table to a list of the temporary tables containing its data. + * <p>The input is a list of files corresponding to each partition of a table. loadThese files are + * loaded into a temporary table (or into the final table if there is only one partition). The + * output is a {@link KV} mapping each final table to a list of the temporary tables containing its + * data. * * <p>In the case where all the data in the files fit into a single load job, this transform loads * the data directly into the final table, skipping temporary tables. In this case, the output * {@link KV} maps the final table to itself. */ -class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, List<String>>, - KV<TableDestination, String>> { +class WriteTables<DestinationT> + extends DoFn<KV<ShardedKey<DestinationT>, List<String>>, KV<TableDestination, String>> { private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class); private final boolean singlePartition; private final BigQueryServices bqServices; private final PCollectionView<String> jobIdToken; + private final PCollectionView<Map<DestinationT, String>> schemasView; private final String tempFilePrefix; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; - private final SerializableFunction<TableDestination, TableSchema> schemaFunction; + private final DynamicDestinations<?, DestinationT> dynamicDestinations; public WriteTables( boolean singlePartition, BigQueryServices bqServices, PCollectionView<String> jobIdToken, + PCollectionView<Map<DestinationT, String>> schemasView, String tempFilePrefix, WriteDisposition writeDisposition, CreateDisposition createDisposition, - SerializableFunction<TableDestination, TableSchema> schemaFunction) { + DynamicDestinations<?, DestinationT> dynamicDestinations) { this.singlePartition = singlePartition; this.bqServices = bqServices; this.jobIdToken = jobIdToken; + this.schemasView = schemasView; this.tempFilePrefix = tempFilePrefix; this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; - this.schemaFunction = schemaFunction; + this.dynamicDestinations = dynamicDestinations; } @ProcessElement public void processElement(ProcessContext c) throws Exception { - TableDestination tableDestination = c.element().getKey().getKey(); + dynamicDestinations.setSideInputAccessorFromProcessContext(c); + DestinationT destination = c.element().getKey().getKey(); + TableSchema tableSchema = + BigQueryHelpers.fromJsonString( + c.sideInput(schemasView).get(destination), TableSchema.class); + TableDestination tableDestination = dynamicDestinations.getTable(destination); + TableReference tableReference = tableDestination.getTableReference(); + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + tableReference.setProjectId( + c.getPipelineOptions().as(BigQueryOptions.class).getProject()); + tableDestination = new TableDestination( + tableReference, tableDestination.getTableDescription()); + } + Integer partition = c.element().getKey().getShardNumber(); List<String> partitionFiles = Lists.newArrayList(c.element().getValue()); - String jobIdPrefix = BigQueryHelpers.createJobId( - c.sideInput(jobIdToken), tableDestination, partition); + String jobIdPrefix = + BigQueryHelpers.createJobId(c.sideInput(jobIdToken), tableDestination, partition); - TableReference ref = tableDestination.getTableReference(); if (!singlePartition) { - ref.setTableId(jobIdPrefix); + tableReference.setTableId(jobIdPrefix); } - TableSchema schema = (schemaFunction != null) ? schemaFunction.apply(tableDestination) : null; load( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, - ref, - schema, + tableReference, + tableSchema, partitionFiles, writeDisposition, createDisposition, tableDestination.getTableDescription()); - c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(ref))); + c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference))); removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partitionFiles); } @@ -131,22 +146,22 @@ class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, List<String>>, List<String> gcsUris, WriteDisposition writeDisposition, CreateDisposition createDisposition, - @Nullable String tableDescription) throws InterruptedException, IOException { - JobConfigurationLoad loadConfig = new JobConfigurationLoad() - .setDestinationTable(ref) - .setSchema(schema) - .setSourceUris(gcsUris) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()) - .setSourceFormat("NEWLINE_DELIMITED_JSON"); + @Nullable String tableDescription) + throws InterruptedException, IOException { + JobConfigurationLoad loadConfig = + new JobConfigurationLoad() + .setDestinationTable(ref) + .setSchema(schema) + .setSourceUris(gcsUris) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat("NEWLINE_DELIMITED_JSON"); String projectId = ref.getProjectId(); Job lastFailedLoadJob = null; for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; - JobReference jobRef = new JobReference() - .setProjectId(projectId) - .setJobId(jobId); + JobReference jobRef = new JobReference().setProjectId(projectId).setJobId(jobId); jobService.startLoadJob(jobRef, loadConfig); Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES); Status jobStatus = BigQueryHelpers.parseStatus(loadJob); @@ -157,31 +172,31 @@ class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, List<String>>, } return; case UNKNOWN: - throw new RuntimeException(String.format( - "UNKNOWN status of load job [%s]: %s.", jobId, - BigQueryHelpers.jobToPrettyString(loadJob))); + throw new RuntimeException( + String.format( + "UNKNOWN status of load job [%s]: %s.", + jobId, BigQueryHelpers.jobToPrettyString(loadJob))); case FAILED: lastFailedLoadJob = loadJob; continue; default: - throw new IllegalStateException(String.format( - "Unexpected status [%s] of load job: %s.", - jobStatus, BigQueryHelpers.jobToPrettyString(loadJob))); + throw new IllegalStateException( + String.format( + "Unexpected status [%s] of load job: %s.", + jobStatus, BigQueryHelpers.jobToPrettyString(loadJob))); } } - throw new RuntimeException(String.format( - "Failed to create load job with id prefix %s, " - + "reached max retries: %d, last failed load job: %s.", - jobIdPrefix, - Write.MAX_RETRY_JOBS, - BigQueryHelpers.jobToPrettyString(lastFailedLoadJob))); + throw new RuntimeException( + String.format( + "Failed to create load job with id prefix %s, " + + "reached max retries: %d, last failed load job: %s.", + jobIdPrefix, + Write.MAX_RETRY_JOBS, + BigQueryHelpers.jobToPrettyString(lastFailedLoadJob))); } static void removeTemporaryFiles( - PipelineOptions options, - String tempFilePrefix, - Collection<String> files) - throws IOException { + PipelineOptions options, String tempFilePrefix, Collection<String> files) throws IOException { IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix); if (factory instanceof GcsIOChannelFactory) { GcsUtil gcsUtil = new GcsUtilFactory().create(options); @@ -203,8 +218,7 @@ class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, List<String>>, public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) - .withLabel("Temporary File Prefix")); + builder.addIfNotNull( + DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index ef3419e..e267dab 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -24,7 +24,10 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdTok import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -44,6 +47,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -66,6 +70,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -206,7 +212,13 @@ public class BigQueryIOTest implements Serializable { assertEquals(project, write.getTable().get().getProjectId()); assertEquals(dataset, write.getTable().get().getDatasetId()); assertEquals(table, write.getTable().get().getTableId()); - assertEquals(schema, write.getSchema()); + if (schema == null) { + assertNull(write.getJsonSchema()); + assertNull(write.getSchemaFromView()); + } else { + assertEquals(schema, BigQueryHelpers.fromJsonString( + write.getJsonSchema().get(), TableSchema.class)); + } assertEquals(createDisposition, write.getCreateDisposition()); assertEquals(writeDisposition, write.getWriteDisposition()); assertEquals(tableDescription, write.getTableDescription()); @@ -443,6 +455,127 @@ public class BigQueryIOTest implements Serializable { p.run(); } + // Create an intermediate type to ensure that coder inference up the inheritance tree is tested. + abstract static class StringIntegerDestinations extends DynamicDestinations<String, Integer> { + } + + @Test + public void testWriteDynamicDestinationsBatch() throws Exception { + writeDynamicDestinations(false); + } + + @Test + public void testWriteDynamicDestinationsStreaming() throws Exception { + writeDynamicDestinations(true); + } + + public void writeDynamicDestinations(boolean streaming) throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("project-id"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeDatasetService datasetService = new FakeDatasetService(); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService()) + .withDatasetService(datasetService); + + datasetService.createDataset("project-id", "dataset-id", "", ""); + + final Pattern userPattern = Pattern.compile("([a-z]+)([0-9]+)"); + Pipeline p = TestPipeline.create(bqOptions); + + final PCollectionView<List<String>> sideInput1 = + p.apply("Create SideInput 1", Create.of("a", "b", "c").withCoder(StringUtf8Coder.of())) + .apply("asList", View.<String>asList()); + final PCollectionView<Map<String, String>> sideInput2 = + p.apply("Create SideInput2", Create.of(KV.of("a", "a"), KV.of("b", "b"), KV.of("c", "c"))) + .apply("AsMap", View.<String, String>asMap()); + + PCollection<String> users = p.apply("CreateUsers", + Create.of("bill1", "sam2", "laurence3") + .withCoder(StringUtf8Coder.of())) + .apply(Window.into(new PartitionedGlobalWindows<>( + new SerializableFunction<String, String>() { + @Override + public String apply(String arg) { + return arg; + } + }))); + + if (streaming) { + users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + } + users.apply("WriteBigQuery", BigQueryIO.<String>write() + .withTestServices(fakeBqServices) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withFormatFunction(new SerializableFunction<String, TableRow>() { + @Override + public TableRow apply(String user) { + Matcher matcher = userPattern.matcher(user); + if (matcher.matches()) { + return new TableRow().set("name", matcher.group(1)) + .set("id", Integer.valueOf(matcher.group(2))); + } + throw new RuntimeException("Unmatching element " + user); + } + }) + .to(new StringIntegerDestinations() { + @Override + public Integer getDestination(ValueInSingleWindow<String> element) { + assertThat(element.getWindow(), Matchers.instanceOf(PartitionedGlobalWindow.class)); + Matcher matcher = userPattern.matcher(element.getValue()); + if (matcher.matches()) { + // Since we name tables by userid, we can simply store an Integer to represent + // a table. + return Integer.valueOf(matcher.group(2)); + } + throw new RuntimeException("Unmatching destination " + element.getValue()); + } + + @Override + public TableDestination getTable(Integer userId) { + verifySideInputs(); + // Each user in it's own table. + return new TableDestination("dataset-id.userid-" + userId, + "table for userid " + userId); + } + + @Override + public TableSchema getSchema(Integer userId) { + verifySideInputs(); + return new TableSchema().setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("id").setType("INTEGER"))); + } + + @Override + public List<PCollectionView<?>> getSideInputs() { + return ImmutableList.of(sideInput1, sideInput2); + } + + private void verifySideInputs() { + assertThat(sideInput(sideInput1), containsInAnyOrder("a", "b", "c")); + Map<String, String> mapSideInput = sideInput(sideInput2); + assertEquals(3, mapSideInput.size()); + assertThat(mapSideInput, + allOf(hasEntry("a", "a"), hasEntry("b", "b"), hasEntry("c", "c"))); + } + }) + .withoutValidation()); + p.run(); + + File tempDir = new File(bqOptions.getTempLocation()); + testNumFiles(tempDir, 0); + + assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-1"), + containsInAnyOrder(new TableRow().set("name", "bill").set("id", 1))); + assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-2"), + containsInAnyOrder(new TableRow().set("name", "sam").set("id", 2))); + assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-3"), + containsInAnyOrder(new TableRow().set("name", "laurence").set("id", 3))); + } + @Test public void testWrite() throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); @@ -571,8 +704,6 @@ public class BigQueryIOTest implements Serializable { return GlobalWindow.INSTANCE.maxTimestamp(); } - // The following methods are only needed due to BEAM-1022. Once this issue is fixed, we will - // no longer need these. @Override public boolean equals(Object other) { if (other instanceof PartitionedGlobalWindow) { @@ -635,7 +766,7 @@ public class BigQueryIOTest implements Serializable { // Create a windowing strategy that puts the input into five different windows depending on // record value. - WindowFn<Integer, PartitionedGlobalWindow> window = new PartitionedGlobalWindows( + WindowFn<Integer, PartitionedGlobalWindow> windowFn = new PartitionedGlobalWindows( new SerializableFunction<Integer, String>() { @Override public String apply(Integer i) { @@ -644,24 +775,44 @@ public class BigQueryIOTest implements Serializable { } ); + final Map<Integer, TableDestination> targetTables = Maps.newHashMap(); + Map<String, String> schemas = Maps.newHashMap(); + for (int i = 0; i < 5; i++) { + TableDestination destination = new TableDestination("project-id:dataset-id" + + ".table-id-" + i, ""); + targetTables.put(i, destination); + // Make sure each target table has its own custom table. + schemas.put(destination.getTableSpec(), + BigQueryHelpers.toJsonString(new TableSchema().setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER"), + new TableFieldSchema().setName("custom_" + i).setType("STRING"))))); + } + SerializableFunction<ValueInSingleWindow<Integer>, TableDestination> tableFunction = new SerializableFunction<ValueInSingleWindow<Integer>, TableDestination>() { @Override public TableDestination apply(ValueInSingleWindow<Integer> input) { PartitionedGlobalWindow window = (PartitionedGlobalWindow) input.getWindow(); - // Check that we can access the element as well here. + // Check that we can access the element as well here and that it matches the window. checkArgument(window.value.equals(Integer.toString(input.getValue() % 5)), "Incorrect element"); - return new TableDestination("project-id:dataset-id.table-id-" + window.value, ""); + return targetTables.get(input.getValue() % 5); } }; Pipeline p = TestPipeline.create(bqOptions); - PCollection<Integer> input = p.apply(Create.of(inserts)); + PCollection<Integer> input = p.apply("CreateSource", Create.of(inserts)); if (streaming) { input = input.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); } - input.apply(Window.<Integer>into(window)) + + PCollectionView<Map<String, String>> schemasView = + p.apply("CreateSchemaMap", Create.of(schemas)) + .apply("ViewSchemaAsMap", View.<String, String>asMap()); + + input.apply(Window.<Integer>into(windowFn)) .apply(BigQueryIO.<Integer>write() .to(tableFunction) .withFormatFunction(new SerializableFunction<Integer, TableRow>() { @@ -670,35 +821,27 @@ public class BigQueryIOTest implements Serializable { return new TableRow().set("name", "number" + i).set("number", i); }}) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withSchema(new TableSchema().setFields( - ImmutableList.of( - new TableFieldSchema().setName("name").setType("STRING"), - new TableFieldSchema().setName("number").setType("INTEGER")))) + .withSchemaFromView(schemasView) .withTestServices(fakeBqServices) .withoutValidation()); p.run(); - - assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-0"), - containsInAnyOrder( - new TableRow().set("name", "number0").set("number", 0), - new TableRow().set("name", "number5").set("number", 5))); - assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-1"), - containsInAnyOrder( - new TableRow().set("name", "number1").set("number", 1), - new TableRow().set("name", "number6").set("number", 6))); - assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-2"), - containsInAnyOrder( - new TableRow().set("name", "number2").set("number", 2), - new TableRow().set("name", "number7").set("number", 7))); - assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-3"), - containsInAnyOrder( - new TableRow().set("name", "number3").set("number", 3), - new TableRow().set("name", "number8").set("number", 8))); - assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-4"), - containsInAnyOrder( - new TableRow().set("name", "number4").set("number", 4), - new TableRow().set("name", "number9").set("number", 9))); + for (int i = 0; i < 5; ++i) { + String tableId = String.format("table-id-%d", i); + String tableSpec = String.format("project-id:dataset-id.%s", tableId); + + // Verify that table was created with the correct schema. + assertThat(BigQueryHelpers.toJsonString( + datasetService.getTable(new TableReference().setProjectId("project-id") + .setDatasetId("dataset-id").setTableId(tableId)).getSchema()), + equalTo(schemas.get(tableSpec))); + + // Verify that the table has the expected contents. + assertThat(datasetService.getAllRows("project-id", "dataset-id", tableId), + containsInAnyOrder( + new TableRow().set("name", String.format("number%d", i)).set("number", i), + new TableRow().set("name", String.format("number%d", i + 5)).set("number", i + 5))); + } } @Test @@ -1585,72 +1728,59 @@ public class BigQueryIOTest implements Serializable { // In the case where a static destination is specified (i.e. not through a dynamic table // function) and there is no input data, WritePartition will generate an empty table. This // code is to test that path. - TableReference singletonReference = new TableReference() - .setProjectId("projectid") - .setDatasetId("dataset") - .setTableId("table"); - String singletonDescription = "singleton"; boolean isSingleton = numTables == 1 && numFilesPerTable == 0; - List<ShardedKey<TableDestination>> expectedPartitions = Lists.newArrayList(); + List<ShardedKey<String>> expectedPartitions = Lists.newArrayList(); if (isSingleton) { - expectedPartitions.add(ShardedKey.of( - new TableDestination(singletonReference, singletonDescription), 1)); + expectedPartitions.add(ShardedKey.<String>of(null, 1)); } else { for (int i = 0; i < numTables; ++i) { for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) { String tableName = String.format("project-id:dataset-id.tables%05d", i); - TableDestination destination = new TableDestination(tableName, tableName); - expectedPartitions.add(ShardedKey.of(destination, j)); + expectedPartitions.add(ShardedKey.of(tableName, j)); } } } - List<WriteBundlesToFiles.Result> files = Lists.newArrayList(); - Map<TableDestination, List<String>> filenamesPerTable = Maps.newHashMap(); + List<WriteBundlesToFiles.Result<String>> files = Lists.newArrayList(); + Map<String, List<String>> filenamesPerTable = Maps.newHashMap(); for (int i = 0; i < numTables; ++i) { String tableName = String.format("project-id:dataset-id.tables%05d", i); - TableDestination destination = new TableDestination(tableName, tableName); - List<String> filenames = filenamesPerTable.get(destination); + List<String> filenames = filenamesPerTable.get(tableName); if (filenames == null) { filenames = Lists.newArrayList(); - filenamesPerTable.put(destination, filenames); + filenamesPerTable.put(tableName, filenames); } for (int j = 0; j < numFilesPerTable; ++j) { String fileName = String.format("%s_files%05d", tableName, j); filenames.add(fileName); - files.add(new Result(fileName, fileSize, destination)); + files.add(new Result<>(fileName, fileSize, tableName)); } } - TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag = - new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {}; - TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag = - new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {}; + TupleTag<KV<ShardedKey<String>, List<String>>> multiPartitionsTag = + new TupleTag<KV<ShardedKey<String>, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<ShardedKey<String>, List<String>>> singlePartitionTag = + new TupleTag<KV<ShardedKey<String>, List<String>>>("singlePartitionTag") {}; - PCollection<WriteBundlesToFiles.Result> filesPCollection = - p.apply(Create.of(files).withType(new TypeDescriptor<WriteBundlesToFiles.Result>() {})); - PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = + PCollection<WriteBundlesToFiles.Result<String>> filesPCollection = + p.apply(Create.of(files) + .withCoder(WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of()))); + PCollectionView<Iterable<WriteBundlesToFiles.Result<String>>> resultsView = PCollectionViews.iterableView( filesPCollection, WindowingStrategy.globalDefault(), - WriteBundlesToFiles.ResultCoder.of()); + WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of())); - ValueProvider<String> singletonTable = null; - if (isSingleton) { - singletonTable = StaticValueProvider.of(BigQueryHelpers.toJsonString(singletonReference)); - } - WritePartition writePartition = - new WritePartition(singletonTable, - "singleton", resultsView, - multiPartitionsTag, singlePartitionTag); + WritePartition<String> writePartition = + new WritePartition<>(isSingleton, resultsView, multiPartitionsTag, singlePartitionTag); - DoFnTester<String, KV<ShardedKey<TableDestination>, List<String>>> tester = + DoFnTester<String, KV<ShardedKey<String>, List<String>>> tester = DoFnTester.of(writePartition); tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files); tester.processElement(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); - List<KV<ShardedKey<TableDestination>, List<String>>> partitions; + List<KV<ShardedKey<String>, List<String>>> partitions; if (expectedNumPartitionsPerTable > 1) { partitions = tester.takeOutputElements(multiPartitionsTag); } else { @@ -1658,10 +1788,10 @@ public class BigQueryIOTest implements Serializable { } - List<ShardedKey<TableDestination>> partitionsResult = Lists.newArrayList(); - Map<TableDestination, List<String>> filesPerTableResult = Maps.newHashMap(); - for (KV<ShardedKey<TableDestination>, List<String>> partition : partitions) { - TableDestination table = partition.getKey().getKey(); + List<ShardedKey<String>> partitionsResult = Lists.newArrayList(); + Map<String, List<String>> filesPerTableResult = Maps.newHashMap(); + for (KV<ShardedKey<String>, List<String>> partition : partitions) { + String table = partition.getKey().getKey(); partitionsResult.add(partition.getKey()); List<String> tableFilesResult = filesPerTableResult.get(table); if (tableFilesResult == null) { @@ -1685,6 +1815,23 @@ public class BigQueryIOTest implements Serializable { } } + static class IdentityDynamicTables extends DynamicDestinations<String, String> { + @Override + public String getDestination(ValueInSingleWindow<String> element) { + throw new UnsupportedOperationException("getDestination not expected in this test."); + } + + @Override + public TableDestination getTable(String destination) { + return new TableDestination(destination, destination); + } + + @Override + public TableSchema getSchema(String destination) { + throw new UnsupportedOperationException("getSchema not expected in this test."); + } + } + @Test public void testWriteTables() throws Exception { p.enableAbandonedNodeEnforcement(false); @@ -1703,7 +1850,7 @@ public class BigQueryIOTest implements Serializable { Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables"); - List<KV<ShardedKey<TableDestination>, List<String>>> partitions = Lists.newArrayList(); + List<KV<ShardedKey<String>, List<String>>> partitions = Lists.newArrayList(); for (int i = 0; i < numTables; ++i) { String tableName = String.format("project-id:dataset-id.table%05d", i); TableDestination tableDestination = new TableDestination(tableName, tableName); @@ -1722,7 +1869,7 @@ public class BigQueryIOTest implements Serializable { } filesPerPartition.add(filename); } - partitions.add(KV.of(ShardedKey.of(tableDestination, j), + partitions.add(KV.of(ShardedKey.of(tableDestination.getTableSpec(), j), filesPerPartition)); List<String> expectedTables = expectedTempTables.get(tableDestination); @@ -1737,23 +1884,30 @@ public class BigQueryIOTest implements Serializable { } } - PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId")); - PCollectionView<String> jobIdTokenView = - jobIdTokenCollection.apply(View.<String>asSingleton()); - - WriteTables writeTables = new WriteTables( - false, - fakeBqServices, - jobIdTokenView, - tempFilePrefix, - WriteDisposition.WRITE_EMPTY, - CreateDisposition.CREATE_IF_NEEDED, - null); - - DoFnTester<KV<ShardedKey<TableDestination>, List<String>>, + PCollectionView<String> jobIdTokenView = p + .apply("CreateJobId", Create.of("jobId")) + .apply(View.<String>asSingleton()); + + PCollectionView<Map<String, String>> schemaMapView = + p.apply("CreateEmptySchema", + Create.empty(new TypeDescriptor<KV<String, String>>() {})) + .apply(View.<String, String>asMap()); + WriteTables<String> writeTables = + new WriteTables<>( + false, + fakeBqServices, + jobIdTokenView, + schemaMapView, + tempFilePrefix, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED, + new IdentityDynamicTables()); + + DoFnTester<KV<ShardedKey<String>, List<String>>, KV<TableDestination, String>> tester = DoFnTester.of(writeTables); tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); - for (KV<ShardedKey<TableDestination>, List<String>> partition : partitions) { + tester.setSideInput(schemaMapView, GlobalWindow.INSTANCE, ImmutableMap.<String, String>of()); + for (KV<ShardedKey<String>, List<String>> partition : partitions) { tester.processElement(partition); } @@ -1867,9 +2021,9 @@ public class BigQueryIOTest implements Serializable { KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())); - PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId")); - PCollectionView<String> jobIdTokenView = - jobIdTokenCollection.apply(View.<String>asSingleton()); + PCollectionView<String> jobIdTokenView = p + .apply("CreateJobId", Create.of("jobId")) + .apply(View.<String>asSingleton()); WriteRename writeRename = new WriteRename( fakeBqServices,