Repository: beam
Updated Branches:
  refs/heads/master dc597873e -> 6be9e0bb2


Support for using raw avro records from BigQuery


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c98c8049
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c98c8049
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c98c8049

Branch: refs/heads/master
Commit: c98c8049fa3607c4cadbabffeba4bde493225540
Parents: dc59787
Author: steve <sniem...@twitter.com>
Authored: Fri Sep 22 11:07:07 2017 -0400
Committer: Eugene Kirpichov <ekirpic...@gmail.com>
Committed: Thu Sep 28 16:05:39 2017 -0700

----------------------------------------------------------------------
 .../examples/cookbook/BigQueryTornadoes.java    |   2 +-
 .../cookbook/CombinePerKeyExamples.java         |   2 +-
 .../beam/examples/cookbook/FilterExamples.java  |   2 +-
 .../beam/examples/cookbook/JoinExamples.java    |   6 +-
 .../examples/cookbook/MaxPerKeyExamples.java    |   2 +-
 .../org/apache/beam/sdk/io/package-info.java    |   2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 407 ++++++++++++++-----
 .../io/gcp/bigquery/BigQueryQuerySource.java    |  22 +-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java |  69 ++--
 .../io/gcp/bigquery/BigQueryTableSource.java    |  19 +-
 .../sdk/io/gcp/bigquery/SchemaAndRecord.java    |  43 ++
 .../beam/sdk/io/gcp/GcpApiSurfaceTest.java      |   2 +
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 100 +++--
 13 files changed, 498 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 07a3edd..df9ff5a 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -156,7 +156,7 @@ public class BigQueryTornadoes {
     fields.add(new 
TableFieldSchema().setName("tornado_count").setType("INTEGER"));
     TableSchema schema = new TableSchema().setFields(fields);
 
-    p.apply(BigQueryIO.read().from(options.getInput()))
+    p.apply(BigQueryIO.readTableRows().from(options.getInput()))
      .apply(new CountTornadoes())
      .apply(BigQueryIO.writeTableRows()
          .to(options.getOutput())

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 693f0c4..1e91aec 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -195,7 +195,7 @@ public class CombinePerKeyExamples {
     fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
     TableSchema schema = new TableSchema().setFields(fields);
 
-    p.apply(BigQueryIO.read().from(options.getInput()))
+    p.apply(BigQueryIO.readTableRows().from(options.getInput()))
      .apply(new PlaysForWord())
      .apply(BigQueryIO.writeTableRows()
         .to(options.getOutput())

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index fed9db7..a4fe425 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -237,7 +237,7 @@ public class FilterExamples {
 
     TableSchema schema = buildWeatherSchemaProjection();
 
-    p.apply(BigQueryIO.read().from(options.getInput()))
+    p.apply(BigQueryIO.readTableRows().from(options.getInput()))
      .apply(ParDo.of(new ProjectionFn()))
      .apply(new BelowGlobalMean(options.getMonthFilter()))
      .apply(BigQueryIO.writeTableRows()

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index d1fffb4..ae8c59c 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -166,8 +166,10 @@ public class JoinExamples {
     Pipeline p = Pipeline.create(options);
     // the following two 'applys' create multiple inputs to our pipeline, one 
for each
     // of our two input sources.
-    PCollection<TableRow> eventsTable = 
p.apply(BigQueryIO.read().from(GDELT_EVENTS_TABLE));
-    PCollection<TableRow> countryCodes = 
p.apply(BigQueryIO.read().from(COUNTRY_CODES));
+    PCollection<TableRow> eventsTable = p.apply(
+        BigQueryIO.readTableRows().from(GDELT_EVENTS_TABLE));
+    PCollection<TableRow> countryCodes = p.apply(
+        BigQueryIO.readTableRows().from(COUNTRY_CODES));
     PCollection<String> formattedResults = joinEvents(eventsTable, 
countryCodes);
     formattedResults.apply(TextIO.write().to(options.getOutput()));
     p.run().waitUntilFinish();

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index 295b3f4..992580e 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -149,7 +149,7 @@ public class MaxPerKeyExamples {
     fields.add(new 
TableFieldSchema().setName("max_mean_temp").setType("FLOAT"));
     TableSchema schema = new TableSchema().setFields(fields);
 
-    p.apply(BigQueryIO.read().from(options.getInput()))
+    p.apply(BigQueryIO.readTableRows().from(options.getInput()))
      .apply(new MaxMeanTemp())
      .apply(BigQueryIO.writeTableRows()
         .to(options.getOutput())

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index 3fc8e32..dd6d009 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -24,7 +24,7 @@
  * from existing storage:
  * <pre>{@code
  * PCollection<TableRow> inputData = pipeline.apply(
- *     
BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
+ *     
BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations"));
  * }</pre>
  * and {@code Write} transforms that persist PCollections to external storage:
  * <pre> {@code

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 9e1dbfe..2771687 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -45,11 +45,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
@@ -93,6 +95,8 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -125,28 +129,44 @@ import org.slf4j.LoggerFactory;
  *
  * <h3>Reading</h3>
  *
- * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} 
transformation. This produces a
- * {@link PCollection} of {@link TableRow TableRows} as output:
+ * <p>Reading from BigQuery is supported by {@link 
#read(SerializableFunction)}, which parses
+ * records in <a 
href="https://cloud.google.com/bigquery/data-formats#avro_format";>AVRO 
format</a>
+ * into a custom type using a specified parse function, and by {@link 
#readTableRows} which parses
+ * them into {@link TableRow}, which may be more convenient but has lower 
performance.
  *
+ * <p>Both functions support reading either from a table or from the result of 
a query, via
+ * {@link TypedRead#from(String)} and {@link TypedRead#fromQuery} 
respectively. Exactly one
+ * of these must be specified.
+ *
+ * <b>Example: Reading rows of a table as {@link TableRow}.</b>
  * <pre>{@code
  * PCollection<TableRow> weatherData = pipeline.apply(
- *     
BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
+ *     
BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations"));
  * }</pre>
  *
- * <p>See {@link TableRow} for more information on the {@link TableRow} object.
+ * <b>Example: Reading rows of a table and parsing them into a custom type.</b>
+ * <pre>{@code
+ * PCollection<WeatherRecord> weatherData = pipeline.apply(
+ *    BigQueryIO
+ *      .read(new SerializableFunction<SchemaAndRecord, WeatherRecord>() {
+ *        public WeatherRecord apply(SchemaAndRecord schemaAndRecord) {
+ *          return new WeatherRecord(...);
+ *        }
+ *      })
+ *      .from("clouddataflow-readonly:samples.weather_stations"))
+ *      .withCoder(SerializableCoder.of(WeatherRecord.class));
+ * }</pre>
  *
- * <p>Users may provide a query to read from rather than reading all of a 
BigQuery table. If
- * specified, the result obtained by executing the specified query will be 
used as the data of the
- * input transform.
+ * <p>Note: When using {@link #read(SerializableFunction)}, you may sometimes 
need to use
+ * {@link TypedRead#withCoder(Coder)} to specify a {@link Coder} for the 
result type, if Beam
+ * fails to infer it automatically.
  *
+ * <b>Example: Reading results of a query as {@link TableRow}.</b>
  * <pre>{@code
- * PCollection<TableRow> meanTemperatureData = pipeline.apply(
- *     BigQueryIO.read().fromQuery("SELECT year, mean_temp FROM 
[samples.weather_stations]"));
+ * PCollection<TableRow> meanTemperatureData = 
pipeline.apply(BigQueryIO.readTableRows()
+ *     .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
  * }</pre>
  *
- * <p>When creating a BigQuery input transform, users should provide either a 
query or a table.
- * Pipeline construction will fail with a validation error if neither or both 
are specified.
- *
  * <h3>Writing</h3>
  *
  * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} 
transformation. This consumes
@@ -288,67 +308,116 @@ public class BigQueryIO {
       };
 
   /**
-   * A {@link PTransform} that reads from a BigQuery table and returns a
-   * {@link PCollection} of {@link TableRow TableRows} containing each of the 
rows of the table.
+   * @deprecated Use {@link #read(SerializableFunction)} or {@link 
#readTableRows} instead.
+   * {@link #readTableRows()} does exactly the same as {@link #read}, however
+   * {@link #read(SerializableFunction)} performs better.
+   */
+  @Deprecated
+  public static Read read() {
+    return new Read();
+  }
+
+  /**
+   * Like {@link #read(SerializableFunction)} but represents each row as a 
{@link TableRow}.
+   *
+   * <p>This method is more convenient to use in some cases, but usually has 
significantly lower
+   * performance than using {@link #read(SerializableFunction)} directly to 
parse data into a
+   * domain-specific type, due to the overhead of converting the rows to 
{@link TableRow}.
+   */
+  public static TypedRead<TableRow> readTableRows() {
+    return read(new TableRowParser()).withCoder(TableRowJsonCoder.of());
+  }
+
+  /**
+   * Reads from a BigQuery table or query and returns a {@link PCollection} 
with one element per
+   * each row of the table or query result, parsed from the BigQuery AVRO 
format using the specified
+   * function.
    *
-   * <p>Each {@link TableRow} contains values indexed by column name. Here is a
-   * sample processing function that processes a "line" column from rows:
+   * <p>Each {@link SchemaAndRecord} contains a BigQuery {@link TableSchema} 
and a
+   * {@link GenericRecord} representing the row, indexed by column name. Here 
is a
+   * sample parse function that parses click events from a table.
    *
    * <pre>{@code
-   * static class ExtractWordsFn extends DoFn<TableRow, String> {
-   *   public void processElement(ProcessContext c) {
-   *     // Get the "line" field of the TableRow object, split it into words, 
and emit them.
-   *     TableRow row = c.element();
-   *     String[] words = row.get("line").toString().split("[^a-zA-Z']+");
-   *     for (String word : words) {
-   *       if (!word.isEmpty()) {
-   *         c.output(word);
-   *       }
-   *     }
+   * p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, 
ClickEvent>() {
+   *   public Event apply(SchemaAndRecord record) {
+   *     GenericRecord r = record.getRecord();
+   *     return new Event((Long) r.get("userId"), (String) r.get("url"));
    *   }
-   * }
+   * }).from("...");
    * }</pre>
    */
-  public static Read read() {
-    return new AutoValue_BigQueryIO_Read.Builder()
+  public static <T> TypedRead<T> read(
+      SerializableFunction<SchemaAndRecord, T> parseFn) {
+    return new AutoValue_BigQueryIO_TypedRead.Builder<T>()
         .setValidate(true)
         .setWithTemplateCompatibility(false)
         .setBigQueryServices(new BigQueryServicesImpl())
+        .setParseFn(parseFn)
         .build();
   }
 
-  /** Implementation of {@link #read}. */
-  @AutoValue
-  public abstract static class Read extends PTransform<PBegin, 
PCollection<TableRow>> {
-    @Nullable abstract ValueProvider<String> getJsonTableRef();
-    @Nullable abstract ValueProvider<String> getQuery();
-    abstract boolean getValidate();
-    @Nullable abstract Boolean getFlattenResults();
-    @Nullable abstract Boolean getUseLegacySql();
+  @VisibleForTesting
+  static class TableRowParser
+      implements SerializableFunction<SchemaAndRecord, TableRow> {
 
-    abstract Boolean getWithTemplateCompatibility();
+    public static final TableRowParser INSTANCE = new TableRowParser();
 
-    abstract BigQueryServices getBigQueryServices();
-    abstract Builder toBuilder();
+    public TableRow apply(SchemaAndRecord schemaAndRecord) {
+      return BigQueryAvroUtils.convertGenericRecordToTableRow(
+          schemaAndRecord.getRecord(),
+          schemaAndRecord.getTableSchema());
+    }
+  }
 
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
-      abstract Builder setQuery(ValueProvider<String> query);
-      abstract Builder setValidate(boolean validate);
-      abstract Builder setFlattenResults(Boolean flattenResults);
-      abstract Builder setUseLegacySql(Boolean useLegacySql);
+  /** Implementation of {@link BigQueryIO#read()}. */
+  public static class Read
+      extends PTransform<PBegin, PCollection<TableRow>> {
+    private final TypedRead<TableRow> inner;
 
-      abstract Builder setWithTemplateCompatibility(Boolean 
useTemplateCompatibility);
+    Read() {
+      
this(BigQueryIO.read(TableRowParser.INSTANCE).withCoder(TableRowJsonCoder.of()));
+    }
 
-      abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
-      abstract Read build();
+    Read(TypedRead<TableRow> inner) {
+      this.inner = inner;
     }
 
-    /** Ensures that methods of the from() / fromQuery() family are called at 
most once. */
-    private void ensureFromNotCalledYet() {
-      checkState(
-          getJsonTableRef() == null && getQuery() == null, "from() or 
fromQuery() already called");
+    @Override
+    public PCollection<TableRow> expand(PBegin input) {
+      return input.apply(inner);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      this.inner.populateDisplayData(builder);
+    }
+
+    boolean getValidate() {
+      return this.inner.getValidate();
+    }
+
+    ValueProvider<String> getQuery() {
+      return this.inner.getQuery();
+    }
+
+    Read withTestServices(BigQueryServices testServices) {
+      return new Read(this.inner.withTestServices(testServices));
+    }
+
+    ///////////////////////////////////////////////////////////////////
+
+    /**
+     * Returns the table to read, or {@code null} if reading from a query 
instead.
+     */
+    @Nullable
+    public ValueProvider<TableReference> getTableProvider() {
+      return this.inner.getTableProvider();
+    }
+
+    /** Returns the table to read, or {@code null} if reading from a query 
instead. */
+    @Nullable
+    public TableReference getTable() {
+      return this.inner.getTable();
     }
 
     /**
@@ -356,18 +425,19 @@ public class BigQueryIO {
      * {@code "[dataset_id].[table_id]"} for tables within the current project.
      */
     public Read from(String tableSpec) {
-      return from(StaticValueProvider.of(tableSpec));
+      return new Read(this.inner.from(tableSpec));
     }
 
     /** Same as {@code from(String)}, but with a {@link ValueProvider}. */
     public Read from(ValueProvider<String> tableSpec) {
-      ensureFromNotCalledYet();
-      return toBuilder()
-          .setJsonTableRef(
-              NestedValueProvider.of(
-                  NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
-                  new TableRefToJson()))
-          .build();
+      return new Read(this.inner.from(tableSpec));
+    }
+
+    /**
+     * Read from table specified by a {@link TableReference}.
+     */
+    public Read from(TableReference table) {
+      return new Read(this.inner.from(table));
     }
 
     /**
@@ -375,40 +445,28 @@ public class BigQueryIO {
      *
      * <p>By default, the query results will be flattened -- see 
"flattenResults" in the <a
      * href="https://cloud.google.com/bigquery/docs/reference/v2/jobs";>Jobs 
documentation</a> for
-     * more information. To disable flattening, use {@link 
BigQueryIO.Read#withoutResultFlattening}.
+     * more information. To disable flattening, use {@link 
Read#withoutResultFlattening}.
      *
      * <p>By default, the query will use BigQuery's legacy SQL dialect. To use 
the BigQuery Standard
-     * SQL dialect, use {@link BigQueryIO.Read#usingStandardSql}.
+     * SQL dialect, use {@link Read#usingStandardSql}.
      */
     public Read fromQuery(String query) {
-      return fromQuery(StaticValueProvider.of(query));
+      return new Read(this.inner.fromQuery(query));
     }
 
     /**
      * Same as {@code fromQuery(String)}, but with a {@link ValueProvider}.
      */
     public Read fromQuery(ValueProvider<String> query) {
-      ensureFromNotCalledYet();
-      return 
toBuilder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
+      return new Read(this.inner.fromQuery(query));
     }
 
     /**
-     * Read from table specified by a {@link TableReference}.
-     */
-    public Read from(TableReference table) {
-      return from(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table)));
-    }
-
-    private static final String QUERY_VALIDATION_FAILURE_ERROR =
-        "Validation of query \"%1$s\" failed. If the query depends on an 
earlier stage of the"
-        + " pipeline, This validation can be disabled using 
#withoutValidation.";
-
-    /**
      * Disable validation that the table exists or the query succeeds prior to 
pipeline submission.
      * Basic validation (such as ensuring that a query or table is specified) 
still occurs.
      */
     public Read withoutValidation() {
-      return toBuilder().setValidate(false).build();
+      return new Read(this.inner.withoutValidation());
     }
 
     /**
@@ -419,7 +477,7 @@ public class BigQueryIO {
      * from a table will cause an error during validation.
      */
     public Read withoutResultFlattening() {
-      return toBuilder().setFlattenResults(false).build();
+      return new Read(this.inner.withoutResultFlattening());
     }
 
     /**
@@ -429,7 +487,7 @@ public class BigQueryIO {
      * from a table will cause an error during validation.
      */
     public Read usingStandardSql() {
-      return toBuilder().setUseLegacySql(false).build();
+      return new Read(this.inner.usingStandardSql());
     }
 
     /**
@@ -440,26 +498,94 @@ public class BigQueryIO {
      */
     @Experimental(Experimental.Kind.SOURCE_SINK)
     public Read withTemplateCompatibility() {
-      return toBuilder().setWithTemplateCompatibility(true).build();
+      return new Read(this.inner.withTemplateCompatibility());
     }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Implementation of {@link BigQueryIO#read(SerializableFunction)}.
+   */
+  @AutoValue
+  public abstract static class TypedRead<T> extends PTransform<PBegin, 
PCollection<T>> {
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef);
+      abstract Builder<T> setQuery(ValueProvider<String> query);
+      abstract Builder<T> setValidate(boolean validate);
+      abstract Builder<T> setFlattenResults(Boolean flattenResults);
+      abstract Builder<T> setUseLegacySql(Boolean useLegacySql);
+      abstract Builder<T> setWithTemplateCompatibility(Boolean 
useTemplateCompatibility);
+      abstract Builder<T> setBigQueryServices(BigQueryServices 
bigQueryServices);
+      abstract TypedRead<T> build();
+
+      abstract Builder<T> setParseFn(
+          SerializableFunction<SchemaAndRecord, T> parseFn);
+      abstract Builder<T> setCoder(Coder<T> coder);
+    }
+
+    @Nullable abstract ValueProvider<String> getJsonTableRef();
+    @Nullable abstract ValueProvider<String> getQuery();
+    abstract boolean getValidate();
+    @Nullable abstract Boolean getFlattenResults();
+    @Nullable abstract Boolean getUseLegacySql();
+
+    abstract Boolean getWithTemplateCompatibility();
+
+    abstract BigQueryServices getBigQueryServices();
+
+    abstract SerializableFunction<SchemaAndRecord, T> getParseFn();
+
+    @Nullable abstract Coder<T> getCoder();
 
     @VisibleForTesting
-    Read withTestServices(BigQueryServices testServices) {
-      return toBuilder().setBigQueryServices(testServices).build();
+    Coder<T> inferCoder(CoderRegistry coderRegistry) {
+      if (getCoder() != null) {
+        return getCoder();
+      }
+
+      TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(getParseFn());
+
+      String message =
+          "Unable to infer coder for output of parseFn. Specify it explicitly 
using withCoder().";
+      checkArgument(descriptor != null, message);
+      try {
+        return coderRegistry.getCoder(descriptor);
+      } catch (CannotProvideCoderException e) {
+        throw new IllegalArgumentException(message, e);
+      }
     }
 
-    private BigQuerySourceBase createSource(String jobUuid) {
-      BigQuerySourceBase source;
+    private BigQuerySourceBase<T> createSource(String jobUuid, Coder<T> coder) 
{
+      BigQuerySourceBase<T> source;
       if (getQuery() == null) {
-        source = BigQueryTableSource.create(jobUuid, getTableProvider(), 
getBigQueryServices());
+        source = BigQueryTableSource.create(
+            jobUuid,
+            getTableProvider(),
+            getBigQueryServices(),
+            coder,
+            getParseFn());
       } else {
         source =
             BigQueryQuerySource.create(
-                jobUuid, getQuery(), getFlattenResults(), getUseLegacySql(), 
getBigQueryServices());
+                jobUuid,
+                getQuery(),
+                getFlattenResults(),
+                getUseLegacySql(),
+                getBigQueryServices(),
+                coder,
+                getParseFn());
       }
       return source;
     }
 
+    private static final String QUERY_VALIDATION_FAILURE_ERROR =
+        "Validation of query \"%1$s\" failed. If the query depends on an 
earlier stage of the"
+            + " pipeline, This validation can be disabled using 
#withoutValidation.";
+
     @Override
     public void validate(PipelineOptions options) {
       // Even if existence validation is disabled, we need to make sure that 
the BigQueryIO
@@ -516,7 +642,7 @@ public class BigQueryIO {
     }
 
     @Override
-    public PCollection<TableRow> expand(PBegin input) {
+    public PCollection<T> expand(PBegin input) {
       ValueProvider<TableReference> table = getTableProvider();
 
       if (table != null) {
@@ -541,11 +667,13 @@ public class BigQueryIO {
             getFlattenResults() != null, "flattenResults should not be null if 
query is set");
         checkArgument(getUseLegacySql() != null, "useLegacySql should not be 
null if query is set");
       }
+      checkArgument(getParseFn() != null, "A parseFn is required");
 
       Pipeline p = input.getPipeline();
+      final Coder<T> coder = inferCoder(p.getCoderRegistry());
       final PCollectionView<String> jobIdTokenView;
-      PCollection<String> jobIdTokenCollection = null;
-      PCollection<TableRow> rows;
+      PCollection<String> jobIdTokenCollection;
+      PCollection<T> rows;
       if (!getWithTemplateCompatibility()) {
         // Create a singleton job ID token at construction time.
         final String staticJobUuid = BigQueryHelpers.randomUUIDString();
@@ -553,7 +681,7 @@ public class BigQueryIO {
             p.apply("TriggerIdCreation", Create.of(staticJobUuid))
                 .apply("ViewId", View.<String>asSingleton());
         // Apply the traditional Source model.
-        rows = 
p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid)));
+        rows = 
p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid, coder)));
       } else {
         // Create a singleton job ID token at execution time.
         jobIdTokenCollection =
@@ -579,7 +707,7 @@ public class BigQueryIO {
                           @ProcessElement
                           public void processElement(ProcessContext c) throws 
Exception {
                             String jobUuid = c.element();
-                            BigQuerySourceBase source = createSource(jobUuid);
+                            BigQuerySourceBase<T> source = 
createSource(jobUuid, coder);
                             BigQuerySourceBase.ExtractResult res =
                                 source.extractFiles(c.getPipelineOptions());
                             for (ResourceId file : res.extractedFiles) {
@@ -600,23 +728,23 @@ public class BigQueryIO {
                 .apply(
                     "ReadFiles",
                     ParDo.of(
-                            new DoFn<String, TableRow>() {
+                            new DoFn<String, T>() {
                               @ProcessElement
                               public void processElement(ProcessContext c) 
throws Exception {
                                 TableSchema schema =
                                     BigQueryHelpers.fromJsonString(
                                         c.sideInput(schemaView), 
TableSchema.class);
                                 String jobUuid = c.sideInput(jobIdTokenView);
-                                BigQuerySourceBase source = 
createSource(jobUuid);
-                                List<BoundedSource<TableRow>> sources =
+                                BigQuerySourceBase<T> source = 
createSource(jobUuid, coder);
+                                List<BoundedSource<T>> sources =
                                     source.createSources(
                                         ImmutableList.of(
                                             FileSystems.matchNewResource(
                                                 c.element(), false /* is 
directory */)),
                                         schema);
                                 checkArgument(sources.size() == 1, "Expected 
exactly one source.");
-                                BoundedSource<TableRow> avroSource = 
sources.get(0);
-                                BoundedSource.BoundedReader<TableRow> reader =
+                                BoundedSource<T> avroSource = sources.get(0);
+                                BoundedSource.BoundedReader<T> reader =
                                     
avroSource.createReader(c.getPipelineOptions());
                                 for (boolean more = reader.start(); more; more 
= reader.advance()) {
                                   c.output(reader.getCurrent());
@@ -624,7 +752,7 @@ public class BigQueryIO {
                               }
                             })
                         .withSideInputs(schemaView, jobIdTokenView))
-                        .setCoder(TableRowJsonCoder.of());
+                        .setCoder(coder);
       }
       PassThroughThenCleanup.CleanupOperation cleanupOperation =
           new PassThroughThenCleanup.CleanupOperation() {
@@ -653,7 +781,7 @@ public class BigQueryIO {
               }
             }
           };
-      return rows.apply(new PassThroughThenCleanup<TableRow>(cleanupOperation, 
jobIdTokenView));
+      return rows.apply(new PassThroughThenCleanup<T>(cleanupOperation, 
jobIdTokenView));
     }
 
     @Override
@@ -673,20 +801,91 @@ public class BigQueryIO {
               true);
     }
 
-    /**
-     * Returns the table to read, or {@code null} if reading from a query 
instead.
-     */
+    /** Ensures that methods of the from() / fromQuery() family are called at 
most once. */
+    private void ensureFromNotCalledYet() {
+      checkState(
+          getJsonTableRef() == null && getQuery() == null, "from() or 
fromQuery() already called");
+    }
+
+    /** See {@link Read#getTableProvider()}. */
     @Nullable
     public ValueProvider<TableReference> getTableProvider() {
       return getJsonTableRef() == null
           ? null : NestedValueProvider.of(getJsonTableRef(), new 
JsonTableRefToTableRef());
     }
-    /** Returns the table to read, or {@code null} if reading from a query 
instead. */
+
+    /** See {@link Read#getTable()}. */
     @Nullable
     public TableReference getTable() {
       ValueProvider<TableReference> provider = getTableProvider();
       return provider == null ? null : provider.get();
     }
+
+    /**
+     * Sets a {@link Coder} for the result of the parse function.  This may be 
required if a coder
+     * can not be inferred automatically.
+     */
+    public TypedRead<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /** See {@link Read#from(String)}. */
+    public TypedRead<T> from(String tableSpec) {
+      return from(StaticValueProvider.of(tableSpec));
+    }
+
+    /** See {@link Read#from(ValueProvider)}. */
+    public TypedRead<T> from(ValueProvider<String> tableSpec) {
+      ensureFromNotCalledYet();
+      return toBuilder()
+          .setJsonTableRef(
+              NestedValueProvider.of(
+                  NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
+                  new TableRefToJson()))
+          .build();
+    }
+
+    /** See {@link Read#fromQuery(String)}. */
+    public TypedRead<T> fromQuery(String query) {
+      return fromQuery(StaticValueProvider.of(query));
+    }
+
+    /** See {@link Read#fromQuery(ValueProvider)}. */
+    public TypedRead<T> fromQuery(ValueProvider<String> query) {
+      ensureFromNotCalledYet();
+      return 
toBuilder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
+    }
+
+    /** See {@link Read#from(TableReference)}. */
+    public TypedRead<T> from(TableReference table) {
+      return from(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table)));
+    }
+
+    /** See {@link Read#withoutValidation()}. */
+    public TypedRead<T> withoutValidation() {
+      return toBuilder().setValidate(false).build();
+    }
+
+    /** See {@link Read#withoutResultFlattening()}. */
+    public TypedRead<T> withoutResultFlattening() {
+      return toBuilder().setFlattenResults(false).build();
+    }
+
+    /** See {@link Read#usingStandardSql()}. */
+    public TypedRead<T> usingStandardSql() {
+      return toBuilder().setUseLegacySql(false).build();
+    }
+
+    /** See {@link Read#withTemplateCompatibility()}. */
+    @Experimental(Experimental.Kind.SOURCE_SINK)
+    public TypedRead<T> withTemplateCompatibility() {
+      return toBuilder().setWithTemplateCompatibility(true).build();
+    }
+
+    @VisibleForTesting
+    TypedRead<T> withTestServices(BigQueryServices testServices) {
+      return toBuilder().setBigQueryServices(testServices).build();
+    }
   }
 
   static String getExtractDestinationUri(String extractDestinationDir) {

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index b92f8cc..a2f8dd9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -32,11 +32,13 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,21 +48,25 @@ import org.slf4j.LoggerFactory;
  * A {@link BigQuerySourceBase} for querying BigQuery tables.
  */
 @VisibleForTesting
-class BigQueryQuerySource extends BigQuerySourceBase {
+class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
   private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryQuerySource.class);
 
-  static BigQueryQuerySource create(
+  static <T>BigQueryQuerySource<T> create(
       String stepUuid,
       ValueProvider<String> query,
       Boolean flattenResults,
       Boolean useLegacySql,
-      BigQueryServices bqServices) {
-    return new BigQueryQuerySource(
+      BigQueryServices bqServices,
+      Coder<T> coder,
+      SerializableFunction<SchemaAndRecord, T> parseFn) {
+    return new BigQueryQuerySource<T>(
         stepUuid,
         query,
         flattenResults,
         useLegacySql,
-        bqServices);
+        bqServices,
+        coder,
+        parseFn);
   }
 
   private final ValueProvider<String> query;
@@ -73,8 +79,10 @@ class BigQueryQuerySource extends BigQuerySourceBase {
       ValueProvider<String> query,
       Boolean flattenResults,
       Boolean useLegacySql,
-      BigQueryServices bqServices) {
-    super(stepUuid, bqServices);
+      BigQueryServices bqServices,
+      Coder<T> coder,
+      SerializableFunction<SchemaAndRecord, T> parseFn) {
+    super(stepUuid, bqServices, coder, parseFn);
     this.query = checkNotNull(query, "query");
     this.flattenResults = checkNotNull(flattenResults, "flattenResults");
     this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index a8e187e..ca900d6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -27,7 +27,6 @@ import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.base.Function;
 import com.google.common.base.Supplier;
@@ -63,7 +62,7 @@ import org.slf4j.LoggerFactory;
  * </ul>
  * ...
  */
-abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
+abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
   private static final Logger LOG = 
LoggerFactory.getLogger(BigQuerySourceBase.class);
 
   // The maximum number of retries to poll a BigQuery job.
@@ -72,11 +71,20 @@ abstract class BigQuerySourceBase extends 
BoundedSource<TableRow> {
   protected final String stepUuid;
   protected final BigQueryServices bqServices;
 
-  private transient List<BoundedSource<TableRow>> cachedSplitResult;
+  private transient List<BoundedSource<T>> cachedSplitResult;
+  private SerializableFunction<SchemaAndRecord, T> parseFn;
+  private Coder<T> coder;
 
-  BigQuerySourceBase(String stepUuid, BigQueryServices bqServices) {
+  BigQuerySourceBase(
+      String stepUuid,
+      BigQueryServices bqServices,
+      Coder<T> coder,
+      SerializableFunction<SchemaAndRecord, T> parseFn
+    ) {
     this.stepUuid = checkNotNull(stepUuid, "stepUuid");
     this.bqServices = checkNotNull(bqServices, "bqServices");
+    this.coder = checkNotNull(coder, "coder");
+    this.parseFn = checkNotNull(parseFn, "parseFn");
   }
 
   protected static class ExtractResult {
@@ -109,12 +117,7 @@ abstract class BigQuerySourceBase extends 
BoundedSource<TableRow> {
   }
 
   @Override
-  public BoundedReader<TableRow> createReader(PipelineOptions options) throws 
IOException {
-    throw new UnsupportedOperationException("BigQuery source must be split 
before being read");
-  }
-
-  @Override
-  public List<BoundedSource<TableRow>> split(
+  public List<BoundedSource<T>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
     // split() can be called multiple times, e.g. Dataflow runner may call it 
multiple times
     // with different desiredBundleSizeBytes in case the split() call produces 
too many sources.
@@ -134,13 +137,18 @@ abstract class BigQuerySourceBase extends 
BoundedSource<TableRow> {
   protected abstract void cleanupTempResource(BigQueryOptions bqOptions) 
throws Exception;
 
   @Override
+  public BoundedReader<T> createReader(PipelineOptions options) throws 
IOException {
+    throw new UnsupportedOperationException("BigQuery source must be split 
before being read");
+  }
+
+  @Override
   public void validate() {
     // Do nothing, validation is done in BigQuery.Read.
   }
 
   @Override
-  public Coder<TableRow> getOutputCoder() {
-    return TableRowJsonCoder.of();
+  public Coder<T> getOutputCoder() {
+    return coder;
   }
 
   private List<ResourceId> executeExtract(
@@ -173,33 +181,34 @@ abstract class BigQuerySourceBase extends 
BoundedSource<TableRow> {
     return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
   }
 
-  List<BoundedSource<TableRow>> createSources(List<ResourceId> files, 
TableSchema schema)
+  private static class TableSchemaFunction
+      implements Serializable, Function<String, TableSchema> {
+    @Nullable
+    @Override
+    public TableSchema apply(@Nullable String input) {
+      return BigQueryHelpers.fromJsonString(input, TableSchema.class);
+    }
+  }
+
+  List<BoundedSource<T>> createSources(List<ResourceId> files, TableSchema 
schema)
       throws IOException, InterruptedException {
-    final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema);
 
-    SerializableFunction<GenericRecord, TableRow> function =
-        new SerializableFunction<GenericRecord, TableRow>() {
+    final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema);
+    SerializableFunction<GenericRecord, T> fnWrapper =
+        new SerializableFunction<GenericRecord, T>() {
           private Supplier<TableSchema> schema = Suppliers.memoize(
               Suppliers.compose(new TableSchemaFunction(), 
Suppliers.ofInstance(jsonSchema)));
 
           @Override
-          public TableRow apply(GenericRecord input) {
-            return BigQueryAvroUtils.convertGenericRecordToTableRow(input, 
schema.get());
-          }};
-
-    List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
+          public T apply(GenericRecord input) {
+            return parseFn.apply(new SchemaAndRecord(input, schema.get()));
+          }
+        };
+    List<BoundedSource<T>> avroSources = Lists.newArrayList();
     for (ResourceId file : files) {
       avroSources.add(
-          AvroSource.from(file.toString()).withParseFn(function, 
getOutputCoder()));
+          AvroSource.from(file.toString()).withParseFn(fnWrapper, 
getOutputCoder()));
     }
     return ImmutableList.copyOf(avroSources);
   }
-
-  private static class TableSchemaFunction implements Serializable, 
Function<String, TableSchema> {
-    @Nullable
-    @Override
-    public TableSchema apply(@Nullable String input) {
-      return BigQueryHelpers.fromJsonString(input, TableSchema.class);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 83a5066..f717cb7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -26,10 +26,12 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,14 +40,16 @@ import org.slf4j.LoggerFactory;
  * A {@link BigQuerySourceBase} for reading BigQuery tables.
  */
 @VisibleForTesting
-class BigQueryTableSource extends BigQuerySourceBase {
+class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
   private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryTableSource.class);
 
-  static BigQueryTableSource create(
+  static <T>BigQueryTableSource<T> create(
       String stepUuid,
       ValueProvider<TableReference> table,
-      BigQueryServices bqServices) {
-    return new BigQueryTableSource(stepUuid, table, bqServices);
+      BigQueryServices bqServices,
+      Coder<T> coder,
+      SerializableFunction<SchemaAndRecord, T> parseFn) {
+    return new BigQueryTableSource<>(stepUuid, table, bqServices, coder, 
parseFn);
   }
 
   private final ValueProvider<String> jsonTable;
@@ -54,8 +58,11 @@ class BigQueryTableSource extends BigQuerySourceBase {
   private BigQueryTableSource(
       String stepUuid,
       ValueProvider<TableReference> table,
-      BigQueryServices bqServices) {
-    super(stepUuid, bqServices);
+      BigQueryServices bqServices,
+      Coder<T> coder,
+      SerializableFunction<SchemaAndRecord, T> parseFn
+  ) {
+    super(stepUuid, bqServices, coder, parseFn);
     this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new 
TableRefToJson());
     this.tableSizeBytes = new AtomicReference<>();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java
new file mode 100644
index 0000000..e6811ef
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaAndRecord.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableSchema;
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * A wrapper for a {@link GenericRecord} and the {@link TableSchema} 
representing the schema of the
+ * table (or query) it was generated from.
+ */
+public class SchemaAndRecord {
+  private final GenericRecord record;
+  private final TableSchema tableSchema;
+
+  public SchemaAndRecord(GenericRecord record, TableSchema tableSchema) {
+    this.record = record;
+    this.tableSchema = tableSchema;
+  }
+
+  public GenericRecord getRecord() {
+    return record;
+  }
+
+  public TableSchema getTableSchema() {
+    return tableSchema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index 8aac417..748d87f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -86,8 +86,10 @@ public class GcpApiSurfaceTest {
             classesInPackage("io.grpc"),
             classesInPackage("java"),
             classesInPackage("javax"),
+            classesInPackage("org.apache.avro"),
             classesInPackage("org.apache.beam"),
             classesInPackage("org.apache.commons.logging"),
+            classesInPackage("org.codehaus.jackson"),
             classesInPackage("org.joda.time"));
 
     assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses));

http://git-wip-us.apache.org/repos/asf/beam/blob/c98c8049/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index a7e1cb9..aa818c6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -48,6 +48,7 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
+import com.google.bigtable.v2.Mutation;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
@@ -56,6 +57,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.google.protobuf.ByteString;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
@@ -78,10 +80,13 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.GenerateSequence;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
@@ -107,6 +112,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -124,6 +130,7 @@ import 
org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.ShardedKey;
@@ -395,30 +402,35 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testReadFromTableOldSource() throws IOException, 
InterruptedException {
-    testReadFromTable(false);
+  public void testReadFromTableWithoutTemplateCompatibility()
+      throws IOException, InterruptedException {
+    testReadFromTable(false, false);
+  }
+
+  @Test
+  public void testReadFromTableWithTemplateCompatibility()
+      throws IOException, InterruptedException {
+    testReadFromTable(true, false);
+  }
+
+  @Test
+  public void testReadTableRowsFromTableWithoutTemplateCompatibility()
+      throws IOException, InterruptedException {
+    testReadFromTable(false, true);
   }
 
   @Test
-  public void testReadFromTableTemplateCompatibility() throws IOException, 
InterruptedException {
-    testReadFromTable(true);
+  public void testReadTableRowsFromTableWithTemplateCompatibility()
+      throws IOException, InterruptedException {
+    testReadFromTable(true, true);
   }
 
-  private void testReadFromTable(boolean useTemplateCompatibility)
+  private void testReadFromTable(boolean useTemplateCompatibility, boolean 
useReadTableRows)
       throws IOException, InterruptedException {
     BigQueryOptions bqOptions = 
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     bqOptions.setProject("defaultproject");
     
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
-    Job job = new Job();
-    JobStatus status = new JobStatus();
-    job.setStatus(status);
-    JobStatistics jobStats = new JobStatistics();
-    job.setStatistics(jobStats);
-    JobStatistics4 extract = new JobStatistics4();
-    jobStats.setExtract(extract);
-    extract.setDestinationUriFileCounts(ImmutableList.of(1L));
-
     Table sometable = new Table();
     sometable.setSchema(
         new TableSchema()
@@ -447,16 +459,24 @@ public class BigQueryIOTest implements Serializable {
         .withDatasetService(fakeDatasetService);
 
     Pipeline p = TestPipeline.create(bqOptions);
-    BigQueryIO.Read read =
-        BigQueryIO.read()
-            .from("non-executing-project:somedataset.sometable")
-            .withTestServices(fakeBqServices)
-            .withoutValidation();
-    if (useTemplateCompatibility) {
-      read = read.withTemplateCompatibility();
+    PTransform<PBegin, PCollection<TableRow>> readTransform;
+    if (useReadTableRows) {
+      BigQueryIO.Read read =
+          BigQueryIO.read()
+              .from("non-executing-project:somedataset.sometable")
+              .withTestServices(fakeBqServices)
+              .withoutValidation();
+      readTransform = useTemplateCompatibility ? 
read.withTemplateCompatibility() : read;
+    } else {
+      BigQueryIO.TypedRead<TableRow> read =
+          BigQueryIO.readTableRows()
+              .from("non-executing-project:somedataset.sometable")
+              .withTestServices(fakeBqServices)
+              .withoutValidation();
+      readTransform = useTemplateCompatibility ? 
read.withTemplateCompatibility() : read;
     }
     PCollection<KV<String, Long>> output =
-        p.apply(read)
+        p.apply(readTransform)
             .apply(
                 ParDo.of(
                     new DoFn<TableRow, KV<String, Long>>() {
@@ -1650,7 +1670,11 @@ public class BigQueryIOTest implements Serializable {
 
     String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
-        stepUuid, StaticValueProvider.of(table), fakeBqServices);
+        stepUuid,
+        StaticValueProvider.of(table),
+        fakeBqServices,
+        TableRowJsonCoder.of(),
+        BigQueryIO.TableRowParser.INSTANCE);
 
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation(baseDir.toString());
@@ -1727,8 +1751,13 @@ public class BigQueryIOTest implements Serializable {
 
     String query = FakeBigQueryServices.encodeQuery(expected);
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
-        stepUuid, StaticValueProvider.of(query),
-        true /* flattenResults */, true /* useLegacySql */, fakeBqServices);
+        stepUuid,
+        StaticValueProvider.of(query),
+        true /* flattenResults */,
+        true /* useLegacySql */,
+        fakeBqServices,
+        TableRowJsonCoder.of(),
+        BigQueryIO.TableRowParser.INSTANCE);
     options.setTempLocation(baseDir.toString());
 
     TableReference queryTable = new TableReference()
@@ -1813,7 +1842,11 @@ public class BigQueryIOTest implements Serializable {
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
         stepUuid,
         StaticValueProvider.of(query),
-        true /* flattenResults */, true /* useLegacySql */, fakeBqServices);
+        true /* flattenResults */,
+        true /* useLegacySql */,
+        fakeBqServices,
+        TableRowJsonCoder.of(),
+        BigQueryIO.TableRowParser.INSTANCE);
 
     options.setTempLocation(baseDir.toString());
 
@@ -2375,4 +2408,19 @@ public class BigQueryIOTest implements Serializable {
     assertEquals("project:dataset.table",
         BigQueryHelpers.stripPartitionDecorator("project:dataset.table"));
   }
+
+  @Test
+  public void testCoderInference() {
+    SerializableFunction<SchemaAndRecord, KV<ByteString, Mutation>> parseFn =
+      new SerializableFunction<SchemaAndRecord, KV<ByteString, Mutation>>() {
+        @Override
+        public KV<ByteString, Mutation> apply(SchemaAndRecord input) {
+          return null;
+        }
+      };
+
+    assertEquals(
+        KvCoder.of(ByteStringCoder.of(), ProtoCoder.of(Mutation.class)),
+        BigQueryIO.read(parseFn).inferCoder(CoderRegistry.createDefault()));
+  }
 }

Reply via email to