This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a679d98  [BEAM-6749] Update BQ tornadoes example to use the storage 
API. (#7958)
a679d98 is described below

commit a679d98cbcc49b01528c168cce8b578338a5bcdd
Author: Kenneth Jung <[email protected]>
AuthorDate: Wed Feb 27 13:10:05 2019 -0800

    [BEAM-6749] Update BQ tornadoes example to use the storage API. (#7958)
---
 .../beam/examples/cookbook/BigQueryTornadoes.java  | 35 +++++++++++++++++++++-
 1 file changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 1d557a3..a4356c7 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -20,10 +20,13 @@ package org.apache.beam.examples.cookbook;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -35,6 +38,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
 
 /**
  * An example that reads the public samples of weather data from BigQuery, 
counts the number of
@@ -136,6 +140,12 @@ public class BigQueryTornadoes {
 
     void setInput(String value);
 
+    @Description("Mode to use when reading from BigQuery")
+    @Default.Enum("EXPORT")
+    TypedRead.Method getReadMethod();
+
+    void setReadMethod(TypedRead.Method value);
+
     @Description(
         "BigQuery table to write to, specified as "
             + "<project_id>:<dataset_id>.<table_id>. The dataset must already 
exist.")
@@ -154,7 +164,30 @@ public class BigQueryTornadoes {
     fields.add(new 
TableFieldSchema().setName("tornado_count").setType("INTEGER"));
     TableSchema schema = new TableSchema().setFields(fields);
 
-    p.apply(BigQueryIO.readTableRows().from(options.getInput()))
+    PCollection<TableRow> rowsFromBigQuery;
+
+    if (options.getReadMethod() == Method.DIRECT_READ) {
+      // Build the read options proto for the read operation.
+      TableReadOptions tableReadOptions =
+          TableReadOptions.newBuilder()
+              .addAllSelectedFields(Lists.newArrayList("month", "tornado"))
+              .build();
+
+      rowsFromBigQuery =
+          p.apply(
+              BigQueryIO.readTableRows()
+                  .from(options.getInput())
+                  .withMethod(Method.DIRECT_READ)
+                  .withReadOptions(tableReadOptions));
+    } else {
+      rowsFromBigQuery =
+          p.apply(
+              BigQueryIO.readTableRows()
+                  .from(options.getInput())
+                  .withMethod(options.getReadMethod()));
+    }
+
+    rowsFromBigQuery
         .apply(new CountTornadoes())
         .apply(
             BigQueryIO.writeTableRows()

Reply via email to