Repository: beam Updated Branches: refs/heads/master dc597873e -> 6be9e0bb2
Support for using raw avro records from BigQuery Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c98c8049 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c98c8049 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c98c8049 Branch: refs/heads/master Commit: c98c8049fa3607c4cadbabffeba4bde493225540 Parents: dc59787 Author: steve <sniem...@twitter.com> Authored: Fri Sep 22 11:07:07 2017 -0400 Committer: Eugene Kirpichov <ekirpic...@gmail.com> Committed: Thu Sep 28 16:05:39 2017 -0700 ---------------------------------------------------------------------- .../examples/cookbook/BigQueryTornadoes.java | 2 +- .../cookbook/CombinePerKeyExamples.java | 2 +- .../beam/examples/cookbook/FilterExamples.java | 2 +- .../beam/examples/cookbook/JoinExamples.java | 6 +- .../examples/cookbook/MaxPerKeyExamples.java | 2 +- .../org/apache/beam/sdk/io/package-info.java | 2 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 407 ++++++++++++++----- .../io/gcp/bigquery/BigQueryQuerySource.java | 22 +- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 69 ++-- .../io/gcp/bigquery/BigQueryTableSource.java | 19 +- .../sdk/io/gcp/bigquery/SchemaAndRecord.java | 43 ++ .../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 2 + .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 100 +++-- 13 files changed, 498 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java index 07a3edd..df9ff5a 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java @@ -156,7 +156,7 @@ public class BigQueryTornadoes { fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER")); TableSchema schema = new TableSchema().setFields(fields); - p.apply(BigQueryIO.read().from(options.getInput())) + p.apply(BigQueryIO.readTableRows().from(options.getInput())) .apply(new CountTornadoes()) .apply(BigQueryIO.writeTableRows() .to(options.getOutput()) http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java index 693f0c4..1e91aec 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java @@ -195,7 +195,7 @@ public class CombinePerKeyExamples { fields.add(new TableFieldSchema().setName("all_plays").setType("STRING")); TableSchema schema = new TableSchema().setFields(fields); - p.apply(BigQueryIO.read().from(options.getInput())) + p.apply(BigQueryIO.readTableRows().from(options.getInput())) .apply(new PlaysForWord()) .apply(BigQueryIO.writeTableRows() .to(options.getOutput()) http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java index fed9db7..a4fe425 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java @@ -237,7 +237,7 @@ public class FilterExamples { TableSchema schema = buildWeatherSchemaProjection(); - p.apply(BigQueryIO.read().from(options.getInput())) + p.apply(BigQueryIO.readTableRows().from(options.getInput())) .apply(ParDo.of(new ProjectionFn())) .apply(new BelowGlobalMean(options.getMonthFilter())) .apply(BigQueryIO.writeTableRows() http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java index d1fffb4..ae8c59c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java @@ -166,8 +166,10 @@ public class JoinExamples { Pipeline p = Pipeline.create(options); // the following two 'applys' create multiple inputs to our pipeline, one for each // of our two input sources. - PCollection<TableRow> eventsTable = p.apply(BigQueryIO.read().from(GDELT_EVENTS_TABLE)); - PCollection<TableRow> countryCodes = p.apply(BigQueryIO.read().from(COUNTRY_CODES)); + PCollection<TableRow> eventsTable = p.apply( + BigQueryIO.readTableRows().from(GDELT_EVENTS_TABLE)); + PCollection<TableRow> countryCodes = p.apply( + BigQueryIO.readTableRows().from(COUNTRY_CODES)); PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes); formattedResults.apply(TextIO.write().to(options.getOutput())); p.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java index 295b3f4..992580e 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java @@ -149,7 +149,7 @@ public class MaxPerKeyExamples { fields.add(new TableFieldSchema().setName("max_mean_temp").setType("FLOAT")); TableSchema schema = new TableSchema().setFields(fields); - p.apply(BigQueryIO.read().from(options.getInput())) + p.apply(BigQueryIO.readTableRows().from(options.getInput())) .apply(new MaxMeanTemp()) .apply(BigQueryIO.writeTableRows() .to(options.getOutput()) http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java index 3fc8e32..dd6d009 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java @@ -24,7 +24,7 @@ * from existing storage: * <pre>{@code * PCollection<TableRow> inputData = pipeline.apply( - * BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations")); + * BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations")); * }</pre> * and {@code Write} transforms that persist PCollections to external storage: * <pre> {@code http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 9e1dbfe..2771687 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -45,11 +45,13 @@ import java.util.List; import java.util.Map; import java.util.regex.Pattern; import javax.annotation.Nullable; +import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; @@ -93,6 +95,8 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.joda.time.Duration; import org.slf4j.Logger; @@ -125,28 +129,44 @@ import org.slf4j.LoggerFactory; * * <h3>Reading</h3> * - * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation. This produces a - * {@link PCollection} of {@link TableRow TableRows} as output: + * <p>Reading from BigQuery is supported by {@link #read(SerializableFunction)}, which parses + * records in <a href="https://cloud.google.com/bigquery/data-formats#avro_format">AVRO format</a> + * into a custom type using a specified parse function, and by {@link #readTableRows} which parses + * them into {@link TableRow}, which may be more convenient but has lower performance. * + * <p>Both functions support reading either from a table or from the result of a query, via + * {@link TypedRead#from(String)} and {@link TypedRead#fromQuery} respectively. Exactly one + * of these must be specified. + * + * <b>Example: Reading rows of a table as {@link TableRow}.</b> * <pre>{@code * PCollection<TableRow> weatherData = pipeline.apply( - * BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations")); + * BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations")); * }</pre> * - * <p>See {@link TableRow} for more information on the {@link TableRow} object. + * <b>Example: Reading rows of a table and parsing them into a custom type.</b> + * <pre>{@code + * PCollection<WeatherRecord> weatherData = pipeline.apply( + * BigQueryIO + * .read(new SerializableFunction<SchemaAndRecord, WeatherRecord>() { + * public WeatherRecord apply(SchemaAndRecord schemaAndRecord) { + * return new WeatherRecord(...); + * } + * }) + * .from("clouddataflow-readonly:samples.weather_stations")) + * .withCoder(SerializableCoder.of(WeatherRecord.class)); + * }</pre> * - * <p>Users may provide a query to read from rather than reading all of a BigQuery table. If - * specified, the result obtained by executing the specified query will be used as the data of the - * input transform. + * <p>Note: When using {@link #read(SerializableFunction)}, you may sometimes need to use + * {@link TypedRead#withCoder(Coder)} to specify a {@link Coder} for the result type, if Beam + * fails to infer it automatically. * + * <b>Example: Reading results of a query as {@link TableRow}.</b> * <pre>{@code - * PCollection<TableRow> meanTemperatureData = pipeline.apply( - * BigQueryIO.read().fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]")); + * PCollection<TableRow> meanTemperatureData = pipeline.apply(BigQueryIO.readTableRows() + * .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]")); * }</pre> * - * <p>When creating a BigQuery input transform, users should provide either a query or a table. - * Pipeline construction will fail with a validation error if neither or both are specified. - * * <h3>Writing</h3> * * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation. This consumes @@ -288,67 +308,116 @@ public class BigQueryIO { }; /** - * A {@link PTransform} that reads from a BigQuery table and returns a - * {@link PCollection} of {@link TableRow TableRows} containing each of the rows of the table. + * @deprecated Use {@link #read(SerializableFunction)} or {@link #readTableRows} instead. + * {@link #readTableRows()} does exactly the same as {@link #read}, however + * {@link #read(SerializableFunction)} performs better. + */ + @Deprecated + public static Read read() { + return new Read(); + } + + /** + * Like {@link #read(SerializableFunction)} but represents each row as a {@link TableRow}. + * + * <p>This method is more convenient to use in some cases, but usually has significantly lower + * performance than using {@link #read(SerializableFunction)} directly to parse data into a + * domain-specific type, due to the overhead of converting the rows to {@link TableRow}. + */ + public static TypedRead<TableRow> readTableRows() { + return read(new TableRowParser()).withCoder(TableRowJsonCoder.of()); + } + + /** + * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per + * each row of the table or query result, parsed from the BigQuery AVRO format using the specified + * function. * - * <p>Each {@link TableRow} contains values indexed by column name. Here is a - * sample processing function that processes a "line" column from rows: + * <p>Each {@link SchemaAndRecord} contains a BigQuery {@link TableSchema} and a + * {@link GenericRecord} representing the row, indexed by column name. Here is a + * sample parse function that parses click events from a table. * * <pre>{@code - * static class ExtractWordsFn extends DoFn<TableRow, String> { - * public void processElement(ProcessContext c) { - * // Get the "line" field of the TableRow object, split it into words, and emit them. - * TableRow row = c.element(); - * String[] words = row.get("line").toString().split("[^a-zA-Z']+"); - * for (String word : words) { - * if (!word.isEmpty()) { - * c.output(word); - * } - * } + * p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, ClickEvent>() { + * public Event apply(SchemaAndRecord record) { + * GenericRecord r = record.getRecord(); + * return new Event((Long) r.get("userId"), (String) r.get("url")); * } - * } + * }).from("..."); * }</pre> */ - public static Read read() { - return new AutoValue_BigQueryIO_Read.Builder() + public static <T> TypedRead<T> read( + SerializableFunction<SchemaAndRecord, T> parseFn) { + return new AutoValue_BigQueryIO_TypedRead.Builder<T>() .setValidate(true) .setWithTemplateCompatibility(false) .setBigQueryServices(new BigQueryServicesImpl()) + .setParseFn(parseFn) .build(); } - /** Implementation of {@link #read}. */ - @AutoValue - public abstract static class Read extends PTransform<PBegin, PCollection<TableRow>> { - @Nullable abstract ValueProvider<String> getJsonTableRef(); - @Nullable abstract ValueProvider<String> getQuery(); - abstract boolean getValidate(); - @Nullable abstract Boolean getFlattenResults(); - @Nullable abstract Boolean getUseLegacySql(); + @VisibleForTesting + static class TableRowParser + implements SerializableFunction<SchemaAndRecord, TableRow> { - abstract Boolean getWithTemplateCompatibility(); + public static final TableRowParser INSTANCE = new TableRowParser(); - abstract BigQueryServices getBigQueryServices(); - abstract Builder toBuilder(); + public TableRow apply(SchemaAndRecord schemaAndRecord) { + return BigQueryAvroUtils.convertGenericRecordToTableRow( + schemaAndRecord.getRecord(), + schemaAndRecord.getTableSchema()); + } + } - @AutoValue.Builder - abstract static class Builder { - abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef); - abstract Builder setQuery(ValueProvider<String> query); - abstract Builder setValidate(boolean validate); - abstract Builder setFlattenResults(Boolean flattenResults); - abstract Builder setUseLegacySql(Boolean useLegacySql); + /** Implementation of {@link BigQueryIO#read()}. */ + public static class Read + extends PTransform<PBegin, PCollection<TableRow>> { + private final TypedRead<TableRow> inner; - abstract Builder setWithTemplateCompatibility(Boolean useTemplateCompatibility); + Read() { + this(BigQueryIO.read(TableRowParser.INSTANCE).withCoder(TableRowJsonCoder.of())); + } - abstract Builder setBigQueryServices(BigQueryServices bigQueryServices); - abstract Read build(); + Read(TypedRead<TableRow> inner) { + this.inner = inner; } - /** Ensures that methods of the from() / fromQuery() family are called at most once. */ - private void ensureFromNotCalledYet() { - checkState( - getJsonTableRef() == null && getQuery() == null, "from() or fromQuery() already called"); + @Override + public PCollection<TableRow> expand(PBegin input) { + return input.apply(inner); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + this.inner.populateDisplayData(builder); + } + + boolean getValidate() { + return this.inner.getValidate(); + } + + ValueProvider<String> getQuery() { + return this.inner.getQuery(); + } + + Read withTestServices(BigQueryServices testServices) { + return new Read(this.inner.withTestServices(testServices)); + } + + /////////////////////////////////////////////////////////////////// + + /** + * Returns the table to read, or {@code null} if reading from a query instead. + */ + @Nullable + public ValueProvider<TableReference> getTableProvider() { + return this.inner.getTableProvider(); + } + + /** Returns the table to read, or {@code null} if reading from a query instead. */ + @Nullable + public TableReference getTable() { + return this.inner.getTable(); } /** @@ -356,18 +425,19 @@ public class BigQueryIO { * {@code "[dataset_id].[table_id]"} for tables within the current project. */ public Read from(String tableSpec) { - return from(StaticValueProvider.of(tableSpec)); + return new Read(this.inner.from(tableSpec)); } /** Same as {@code from(String)}, but with a {@link ValueProvider}. */ public Read from(ValueProvider<String> tableSpec) { - ensureFromNotCalledYet(); - return toBuilder() - .setJsonTableRef( - NestedValueProvider.of( - NestedValueProvider.of(tableSpec, new TableSpecToTableRef()), - new TableRefToJson())) - .build(); + return new Read(this.inner.from(tableSpec)); + } + + /** + * Read from table specified by a {@link TableReference}. + */ + public Read from(TableReference table) { + return new Read(this.inner.from(table)); } /** @@ -375,40 +445,28 @@ public class BigQueryIO { * * <p>By default, the query results will be flattened -- see "flattenResults" in the <a * href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">Jobs documentation</a> for - * more information. To disable flattening, use {@link BigQueryIO.Read#withoutResultFlattening}. + * more information. To disable flattening, use {@link Read#withoutResultFlattening}. * * <p>By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery Standard - * SQL dialect, use {@link BigQueryIO.Read#usingStandardSql}. + * SQL dialect, use {@link Read#usingStandardSql}. */ public Read fromQuery(String query) { - return fromQuery(StaticValueProvider.of(query)); + return new Read(this.inner.fromQuery(query)); } /** * Same as {@code fromQuery(String)}, but with a {@link ValueProvider}. */ public Read fromQuery(ValueProvider<String> query) { - ensureFromNotCalledYet(); - return toBuilder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build(); + return new Read(this.inner.fromQuery(query)); } /** - * Read from table specified by a {@link TableReference}. - */ - public Read from(TableReference table) { - return from(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table))); - } - - private static final String QUERY_VALIDATION_FAILURE_ERROR = - "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the" - + " pipeline, This validation can be disabled using #withoutValidation."; - - /** * Disable validation that the table exists or the query succeeds prior to pipeline submission. * Basic validation (such as ensuring that a query or table is specified) still occurs. */ public Read withoutValidation() { - return toBuilder().setValidate(false).build(); + return new Read(this.inner.withoutValidation()); } /** @@ -419,7 +477,7 @@ public class BigQueryIO { * from a table will cause an error during validation. */ public Read withoutResultFlattening() { - return toBuilder().setFlattenResults(false).build(); + return new Read(this.inner.withoutResultFlattening()); } /** @@ -429,7 +487,7 @@ public class BigQueryIO { * from a table will cause an error during validation. */ public Read usingStandardSql() { - return toBuilder().setUseLegacySql(false).build(); + return new Read(this.inner.usingStandardSql()); } /** @@ -440,26 +498,94 @@ public class BigQueryIO { */ @Experimental(Experimental.Kind.SOURCE_SINK) public Read withTemplateCompatibility() { - return toBuilder().setWithTemplateCompatibility(true).build(); + return new Read(this.inner.withTemplateCompatibility()); } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** + * Implementation of {@link BigQueryIO#read(SerializableFunction)}. + */ + @AutoValue + public abstract static class TypedRead<T> extends PTransform<PBegin, PCollection<T>> { + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef); + abstract Builder<T> setQuery(ValueProvider<String> query); + abstract Builder<T> setValidate(boolean validate); + abstract Builder<T> setFlattenResults(Boolean flattenResults); + abstract Builder<T> setUseLegacySql(Boolean useLegacySql); + abstract Builder<T> setWithTemplateCompatibility(Boolean useTemplateCompatibility); + abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices); + abstract TypedRead<T> build(); + + abstract Builder<T> setParseFn( + SerializableFunction<SchemaAndRecord, T> parseFn); + abstract Builder<T> setCoder(Coder<T> coder); + } + + @Nullable abstract ValueProvider<String> getJsonTableRef(); + @Nullable abstract ValueProvider<String> getQuery(); + abstract boolean getValidate(); + @Nullable abstract Boolean getFlattenResults(); + @Nullable abstract Boolean getUseLegacySql(); + + abstract Boolean getWithTemplateCompatibility(); + + abstract BigQueryServices getBigQueryServices(); + + abstract SerializableFunction<SchemaAndRecord, T> getParseFn(); + + @Nullable abstract Coder<T> getCoder(); @VisibleForTesting - Read withTestServices(BigQueryServices testServices) { - return toBuilder().setBigQueryServices(testServices).build(); + Coder<T> inferCoder(CoderRegistry coderRegistry) { + if (getCoder() != null) { + return getCoder(); + } + + TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(getParseFn()); + + String message = + "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder()."; + checkArgument(descriptor != null, message); + try { + return coderRegistry.getCoder(descriptor); + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException(message, e); + } } - private BigQuerySourceBase createSource(String jobUuid) { - BigQuerySourceBase source; + private BigQuerySourceBase<T> createSource(String jobUuid, Coder<T> coder) { + BigQuerySourceBase<T> source; if (getQuery() == null) { - source = BigQueryTableSource.create(jobUuid, getTableProvider(), getBigQueryServices()); + source = BigQueryTableSource.create( + jobUuid, + getTableProvider(), + getBigQueryServices(), + coder, + getParseFn()); } else { source = BigQueryQuerySource.create( - jobUuid, getQuery(), getFlattenResults(), getUseLegacySql(), getBigQueryServices()); + jobUuid, + getQuery(), + getFlattenResults(), + getUseLegacySql(), + getBigQueryServices(), + coder, + getParseFn()); } return source; } + private static final String QUERY_VALIDATION_FAILURE_ERROR = + "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the" + + " pipeline, This validation can be disabled using #withoutValidation."; + @Override public void validate(PipelineOptions options) { // Even if existence validation is disabled, we need to make sure that the BigQueryIO @@ -516,7 +642,7 @@ public class BigQueryIO { } @Override - public PCollection<TableRow> expand(PBegin input) { + public PCollection<T> expand(PBegin input) { ValueProvider<TableReference> table = getTableProvider(); if (table != null) { @@ -541,11 +667,13 @@ public class BigQueryIO { getFlattenResults() != null, "flattenResults should not be null if query is set"); checkArgument(getUseLegacySql() != null, "useLegacySql should not be null if query is set"); } + checkArgument(getParseFn() != null, "A parseFn is required"); Pipeline p = input.getPipeline(); + final Coder<T> coder = inferCoder(p.getCoderRegistry()); final PCollectionView<String> jobIdTokenView; - PCollection<String> jobIdTokenCollection = null; - PCollection<TableRow> rows; + PCollection<String> jobIdTokenCollection; + PCollection<T> rows; if (!getWithTemplateCompatibility()) { // Create a singleton job ID token at construction time. final String staticJobUuid = BigQueryHelpers.randomUUIDString(); @@ -553,7 +681,7 @@ public class BigQueryIO { p.apply("TriggerIdCreation", Create.of(staticJobUuid)) .apply("ViewId", View.<String>asSingleton()); // Apply the traditional Source model. - rows = p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid))); + rows = p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid, coder))); } else { // Create a singleton job ID token at execution time. jobIdTokenCollection = @@ -579,7 +707,7 @@ public class BigQueryIO { @ProcessElement public void processElement(ProcessContext c) throws Exception { String jobUuid = c.element(); - BigQuerySourceBase source = createSource(jobUuid); + BigQuerySourceBase<T> source = createSource(jobUuid, coder); BigQuerySourceBase.ExtractResult res = source.extractFiles(c.getPipelineOptions()); for (ResourceId file : res.extractedFiles) { @@ -600,23 +728,23 @@ public class BigQueryIO { .apply( "ReadFiles", ParDo.of( - new DoFn<String, TableRow>() { + new DoFn<String, T>() { @ProcessElement public void processElement(ProcessContext c) throws Exception { TableSchema schema = BigQueryHelpers.fromJsonString( c.sideInput(schemaView), TableSchema.class); String jobUuid = c.sideInput(jobIdTokenView); - BigQuerySourceBase source = createSource(jobUuid); - List<BoundedSource<TableRow>> sources = + BigQuerySourceBase<T> source = createSource(jobUuid, coder); + List<BoundedSource<T>> sources = source.createSources( ImmutableList.of( FileSystems.matchNewResource( c.element(), false /* is directory */)), schema); checkArgument(sources.size() == 1, "Expected exactly one source."); - BoundedSource<TableRow> avroSource = sources.get(0); - BoundedSource.BoundedReader<TableRow> reader = + BoundedSource<T> avroSource = sources.get(0); + BoundedSource.BoundedReader<T> reader = avroSource.createReader(c.getPipelineOptions()); for (boolean more = reader.start(); more; more = reader.advance()) { c.output(reader.getCurrent()); @@ -624,7 +752,7 @@ public class BigQueryIO { } }) .withSideInputs(schemaView, jobIdTokenView)) - .setCoder(TableRowJsonCoder.of()); + .setCoder(coder); } PassThroughThenCleanup.CleanupOperation cleanupOperation = new PassThroughThenCleanup.CleanupOperation() { @@ -653,7 +781,7 @@ public class BigQueryIO { } } }; - return rows.apply(new PassThroughThenCleanup<TableRow>(cleanupOperation, jobIdTokenView)); + return rows.apply(new PassThroughThenCleanup<T>(cleanupOperation, jobIdTokenView)); } @Override @@ -673,20 +801,91 @@ public class BigQueryIO { true); } - /** - * Returns the table to read, or {@code null} if reading from a query instead. - */ + /** Ensures that methods of the from() / fromQuery() family are called at most once. */ + private void ensureFromNotCalledYet() { + checkState( + getJsonTableRef() == null && getQuery() == null, "from() or fromQuery() already called"); + } + + /** See {@link Read#getTableProvider()}. */ @Nullable public ValueProvider<TableReference> getTableProvider() { return getJsonTableRef() == null ? null : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef()); } - /** Returns the table to read, or {@code null} if reading from a query instead. */ + + /** See {@link Read#getTable()}. */ @Nullable public TableReference getTable() { ValueProvider<TableReference> provider = getTableProvider(); return provider == null ? null : provider.get(); } + + /** + * Sets a {@link Coder} for the result of the parse function. This may be required if a coder + * can not be inferred automatically. + */ + public TypedRead<T> withCoder(Coder<T> coder) { + return toBuilder().setCoder(coder).build(); + } + + /** See {@link Read#from(String)}. */ + public TypedRead<T> from(String tableSpec) { + return from(StaticValueProvider.of(tableSpec)); + } + + /** See {@link Read#from(ValueProvider)}. */ + public TypedRead<T> from(ValueProvider<String> tableSpec) { + ensureFromNotCalledYet(); + return toBuilder() + .setJsonTableRef( + NestedValueProvider.of( + NestedValueProvider.of(tableSpec, new TableSpecToTableRef()), + new TableRefToJson())) + .build(); + } + + /** See {@link Read#fromQuery(String)}. */ + public TypedRead<T> fromQuery(String query) { + return fromQuery(StaticValueProvider.of(query)); + } + + /** See {@link Read#fromQuery(ValueProvider)}. */ + public TypedRead<T> fromQuery(ValueProvider<String> query) { + ensureFromNotCalledYet(); + return toBuilder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build(); + } + + /** See {@link Read#from(TableReference)}. */ + public TypedRead<T> from(TableReference table) { + return from(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table))); + } + + /** See {@link Read#withoutValidation()}. */ + public TypedRead<T> withoutValidation() { + return toBuilder().setValidate(false).build(); + } + + /** See {@link Read#withoutResultFlattening()}. */ + public TypedRead<T> withoutResultFlattening() { + return toBuilder().setFlattenResults(false).build(); + } + + /** See {@link Read#usingStandardSql()}. */ + public TypedRead<T> usingStandardSql() { + return toBuilder().setUseLegacySql(false).build(); + } + + /** See {@link Read#withTemplateCompatibility()}. */ + @Experimental(Experimental.Kind.SOURCE_SINK) + public TypedRead<T> withTemplateCompatibility() { + return toBuilder().setWithTemplateCompatibility(true).build(); + } + + @VisibleForTesting + TypedRead<T> withTestServices(BigQueryServices testServices) { + return toBuilder().setBigQueryServices(testServices).build(); + } } static String getExtractDestinationUri(String extractDestinationDir) { http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java index b92f8cc..a2f8dd9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java @@ -32,11 +32,13 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,21 +48,25 @@ import org.slf4j.LoggerFactory; * A {@link BigQuerySourceBase} for querying BigQuery tables. */ @VisibleForTesting -class BigQueryQuerySource extends BigQuerySourceBase { +class BigQueryQuerySource<T> extends BigQuerySourceBase<T> { private static final Logger LOG = LoggerFactory.getLogger(BigQueryQuerySource.class); - static BigQueryQuerySource create( + static <T>BigQueryQuerySource<T> create( String stepUuid, ValueProvider<String> query, Boolean flattenResults, Boolean useLegacySql, - BigQueryServices bqServices) { - return new BigQueryQuerySource( + BigQueryServices bqServices, + Coder<T> coder, + SerializableFunction<SchemaAndRecord, T> parseFn) { + return new BigQueryQuerySource<T>( stepUuid, query, flattenResults, useLegacySql, - bqServices); + bqServices, + coder, + parseFn); } private final ValueProvider<String> query; @@ -73,8 +79,10 @@ class BigQueryQuerySource extends BigQuerySourceBase { ValueProvider<String> query, Boolean flattenResults, Boolean useLegacySql, - BigQueryServices bqServices) { - super(stepUuid, bqServices); + BigQueryServices bqServices, + Coder<T> coder, + SerializableFunction<SchemaAndRecord, T> parseFn) { + super(stepUuid, bqServices, coder, parseFn); this.query = checkNotNull(query, "query"); this.flattenResults = checkNotNull(flattenResults, "flattenResults"); this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql"); http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index a8e187e..ca900d6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -27,7 +27,6 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.Function; import com.google.common.base.Supplier; @@ -63,7 +62,7 @@ import org.slf4j.LoggerFactory; * </ul> * ... */ -abstract class BigQuerySourceBase extends BoundedSource<TableRow> { +abstract class BigQuerySourceBase<T> extends BoundedSource<T> { private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceBase.class); // The maximum number of retries to poll a BigQuery job. @@ -72,11 +71,20 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { protected final String stepUuid; protected final BigQueryServices bqServices; - private transient List<BoundedSource<TableRow>> cachedSplitResult; + private transient List<BoundedSource<T>> cachedSplitResult; + private SerializableFunction<SchemaAndRecord, T> parseFn; + private Coder<T> coder; - BigQuerySourceBase(String stepUuid, BigQueryServices bqServices) { + BigQuerySourceBase( + String stepUuid, + BigQueryServices bqServices, + Coder<T> coder, + SerializableFunction<SchemaAndRecord, T> parseFn + ) { this.stepUuid = checkNotNull(stepUuid, "stepUuid"); this.bqServices = checkNotNull(bqServices, "bqServices"); + this.coder = checkNotNull(coder, "coder"); + this.parseFn = checkNotNull(parseFn, "parseFn"); } protected static class ExtractResult { @@ -109,12 +117,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { } @Override - public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException { - throw new UnsupportedOperationException("BigQuery source must be split before being read"); - } - - @Override - public List<BoundedSource<TableRow>> split( + public List<BoundedSource<T>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { // split() can be called multiple times, e.g. Dataflow runner may call it multiple times // with different desiredBundleSizeBytes in case the split() call produces too many sources. @@ -134,13 +137,18 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception; @Override + public BoundedReader<T> createReader(PipelineOptions options) throws IOException { + throw new UnsupportedOperationException("BigQuery source must be split before being read"); + } + + @Override public void validate() { // Do nothing, validation is done in BigQuery.Read. } @Override - public Coder<TableRow> getOutputCoder() { - return TableRowJsonCoder.of(); + public Coder<T> getOutputCoder() { + return coder; } private List<ResourceId> executeExtract( @@ -173,33 +181,34 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob); } - List<BoundedSource<TableRow>> createSources(List<ResourceId> files, TableSchema schema) + private static class TableSchemaFunction + implements Serializable, Function<String, TableSchema> { + @Nullable + @Override + public TableSchema apply(@Nullable String input) { + return BigQueryHelpers.fromJsonString(input, TableSchema.class); + } + } + + List<BoundedSource<T>> createSources(List<ResourceId> files, TableSchema schema) throws IOException, InterruptedException { - final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema); - SerializableFunction<GenericRecord, TableRow> function = - new SerializableFunction<GenericRecord, TableRow>() { + final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema); + SerializableFunction<GenericRecord, T> fnWrapper = + new SerializableFunction<GenericRecord, T>() { private Supplier<TableSchema> schema = Suppliers.memoize( Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(jsonSchema))); @Override - public TableRow apply(GenericRecord input) { - return BigQueryAvroUtils.convertGenericRecordToTableRow(input, schema.get()); - }}; - - List<BoundedSource<TableRow>> avroSources = Lists.newArrayList(); + public T apply(GenericRecord input) { + return parseFn.apply(new SchemaAndRecord(input, schema.get())); + } + }; + List<BoundedSource<T>> avroSources = Lists.newArrayList(); for (ResourceId file : files) { avroSources.add( - AvroSource.from(file.toString()).withParseFn(function, getOutputCoder())); + AvroSource.from(file.toString()).withParseFn(fnWrapper, getOutputCoder())); } return ImmutableList.copyOf(avroSources); } - - private static class TableSchemaFunction implements Serializable, Function<String, TableSchema> { - @Nullable - @Override - public TableSchema apply(@Nullable String input) { - return BigQueryHelpers.fromJsonString(input, TableSchema.class); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java index 83a5066..f717cb7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java @@ -26,10 +26,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,14 +40,16 @@ import org.slf4j.LoggerFactory; * A {@link BigQuerySourceBase} for reading BigQuery tables. */ @VisibleForTesting -class BigQueryTableSource extends BigQuerySourceBase { +class BigQueryTableSource<T> extends BigQuerySourceBase<T> { private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableSource.class); - static BigQueryTableSource create( + static <T>BigQueryTableSource<T> create( String stepUuid, ValueProvider<TableReference> table, - BigQueryServices bqServices) { - return new BigQueryTableSource(stepUuid, table, bqServices); + BigQueryServices bqServices, + Coder<T> coder, + SerializableFunction<SchemaAndRecord, T> parseFn) { + return new BigQueryTableSource<>(stepUuid, table, bqServices, coder, parseFn); } private final ValueProvider<String> jsonTable; @@ -54,8 +58,11 @@ class BigQueryTableSource extends BigQuerySourceBase { private BigQueryTableSource( String stepUuid, ValueProvider<TableReference> table, - BigQueryServices bqServices) { - super(stepUuid, bqServices); + BigQueryServices bqServices, + Coder<T> coder, + SerializableFunction<SchemaAndRecord, T> parseFn + ) { + super(stepUuid, bqServices, coder, parseFn); this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson()); this.tableSizeBytes = new AtomicReference<>(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java new file mode 100644 index 0000000..e6811ef --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableSchema; +import org.apache.avro.generic.GenericRecord; + +/** + * A wrapper for a {@link GenericRecord} and the {@link TableSchema} representing the schema of the + * table (or query) it was generated from. + */ +public class SchemaAndRecord { + private final GenericRecord record; + private final TableSchema tableSchema; + + public SchemaAndRecord(GenericRecord record, TableSchema tableSchema) { + this.record = record; + this.tableSchema = tableSchema; + } + + public GenericRecord getRecord() { + return record; + } + + public TableSchema getTableSchema() { + return tableSchema; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index 8aac417..748d87f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -86,8 +86,10 @@ public class GcpApiSurfaceTest { classesInPackage("io.grpc"), classesInPackage("java"), classesInPackage("javax"), + classesInPackage("org.apache.avro"), classesInPackage("org.apache.beam"), classesInPackage("org.apache.commons.logging"), + classesInPackage("org.codehaus.jackson"), classesInPackage("org.joda.time")); assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses)); http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index a7e1cb9..aa818c6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -48,6 +48,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; +import com.google.bigtable.v2.Mutation; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; @@ -56,6 +57,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.protobuf.ByteString; import java.io.File; import java.io.FileFilter; import java.io.IOException; @@ -78,10 +80,13 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; @@ -107,6 +112,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -124,6 +130,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.ShardedKey; @@ -395,30 +402,35 @@ public class BigQueryIOTest implements Serializable { } @Test - public void testReadFromTableOldSource() throws IOException, InterruptedException { - testReadFromTable(false); + public void testReadFromTableWithoutTemplateCompatibility() + throws IOException, InterruptedException { + testReadFromTable(false, false); + } + + @Test + public void testReadFromTableWithTemplateCompatibility() + throws IOException, InterruptedException { + testReadFromTable(true, false); + } + + @Test + public void testReadTableRowsFromTableWithoutTemplateCompatibility() + throws IOException, InterruptedException { + testReadFromTable(false, true); } @Test - public void testReadFromTableTemplateCompatibility() throws IOException, InterruptedException { - testReadFromTable(true); + public void testReadTableRowsFromTableWithTemplateCompatibility() + throws IOException, InterruptedException { + testReadFromTable(true, true); } - private void testReadFromTable(boolean useTemplateCompatibility) + private void testReadFromTable(boolean useTemplateCompatibility, boolean useReadTableRows) throws IOException, InterruptedException { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultproject"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); - Job job = new Job(); - JobStatus status = new JobStatus(); - job.setStatus(status); - JobStatistics jobStats = new JobStatistics(); - job.setStatistics(jobStats); - JobStatistics4 extract = new JobStatistics4(); - jobStats.setExtract(extract); - extract.setDestinationUriFileCounts(ImmutableList.of(1L)); - Table sometable = new Table(); sometable.setSchema( new TableSchema() @@ -447,16 +459,24 @@ public class BigQueryIOTest implements Serializable { .withDatasetService(fakeDatasetService); Pipeline p = TestPipeline.create(bqOptions); - BigQueryIO.Read read = - BigQueryIO.read() - .from("non-executing-project:somedataset.sometable") - .withTestServices(fakeBqServices) - .withoutValidation(); - if (useTemplateCompatibility) { - read = read.withTemplateCompatibility(); + PTransform<PBegin, PCollection<TableRow>> readTransform; + if (useReadTableRows) { + BigQueryIO.Read read = + BigQueryIO.read() + .from("non-executing-project:somedataset.sometable") + .withTestServices(fakeBqServices) + .withoutValidation(); + readTransform = useTemplateCompatibility ? read.withTemplateCompatibility() : read; + } else { + BigQueryIO.TypedRead<TableRow> read = + BigQueryIO.readTableRows() + .from("non-executing-project:somedataset.sometable") + .withTestServices(fakeBqServices) + .withoutValidation(); + readTransform = useTemplateCompatibility ? read.withTemplateCompatibility() : read; } PCollection<KV<String, Long>> output = - p.apply(read) + p.apply(readTransform) .apply( ParDo.of( new DoFn<TableRow, KV<String, Long>>() { @@ -1650,7 +1670,11 @@ public class BigQueryIOTest implements Serializable { String stepUuid = "testStepUuid"; BoundedSource<TableRow> bqSource = BigQueryTableSource.create( - stepUuid, StaticValueProvider.of(table), fakeBqServices); + stepUuid, + StaticValueProvider.of(table), + fakeBqServices, + TableRowJsonCoder.of(), + BigQueryIO.TableRowParser.INSTANCE); PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(baseDir.toString()); @@ -1727,8 +1751,13 @@ public class BigQueryIOTest implements Serializable { String query = FakeBigQueryServices.encodeQuery(expected); BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( - stepUuid, StaticValueProvider.of(query), - true /* flattenResults */, true /* useLegacySql */, fakeBqServices); + stepUuid, + StaticValueProvider.of(query), + true /* flattenResults */, + true /* useLegacySql */, + fakeBqServices, + TableRowJsonCoder.of(), + BigQueryIO.TableRowParser.INSTANCE); options.setTempLocation(baseDir.toString()); TableReference queryTable = new TableReference() @@ -1813,7 +1842,11 @@ public class BigQueryIOTest implements Serializable { BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( stepUuid, StaticValueProvider.of(query), - true /* flattenResults */, true /* useLegacySql */, fakeBqServices); + true /* flattenResults */, + true /* useLegacySql */, + fakeBqServices, + TableRowJsonCoder.of(), + BigQueryIO.TableRowParser.INSTANCE); options.setTempLocation(baseDir.toString()); @@ -2375,4 +2408,19 @@ public class BigQueryIOTest implements Serializable { assertEquals("project:dataset.table", BigQueryHelpers.stripPartitionDecorator("project:dataset.table")); } + + @Test + public void testCoderInference() { + SerializableFunction<SchemaAndRecord, KV<ByteString, Mutation>> parseFn = + new SerializableFunction<SchemaAndRecord, KV<ByteString, Mutation>>() { + @Override + public KV<ByteString, Mutation> apply(SchemaAndRecord input) { + return null; + } + }; + + assertEquals( + KvCoder.of(ByteStringCoder.of(), ProtoCoder.of(Mutation.class)), + BigQueryIO.read(parseFn).inferCoder(CoderRegistry.createDefault())); + } }