This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 833261f0a04 Add minimal example for BigQuery usage (#26713)
833261f0a04 is described below

commit 833261f0a041e04ca4dcb8719424fda09edcc5e6
Author: Timur Sultanov <timur.sulta...@akvelon.com>
AuthorDate: Tue Jun 6 23:38:38 2023 +0400

    Add minimal example for BigQuery usage (#26713)
    
    * Add minimal example for BigQuery usage
    
    * Add project name to default pipeline options
    
    * Add role for Service account - BigQuery
    
    * BigQuery DataViewer role added
    
    * Update MinimalBigQueryTornadoes.java
    
    ---------
    
    Co-authored-by: Sergey Makarkin <sergey.makar...@akvelon.com>
---
 .../cookbook/MinimalBigQueryTornadoes.java         | 142 +++++++++++++++++++++
 .../cookbook/MinimalBigQueryTornadoesTest.java     |  82 ++++++++++++
 playground/terraform/infrastructure/setup/iam.tf   |   2 +-
 3 files changed, 225 insertions(+), 1 deletion(-)

diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java
new file mode 100644
index 00000000000..43654ea4a05
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// beam-playground:
+//   name: MinimalBigQueryTornadoes
+//   description: An example that reads the public samples of weather data 
from BigQuery.
+//   multifile: false
+//   never_run: true
+//   always_run: true
+//   default_example: false
+//   pipeline_options: --project apache-beam-testing
+//   context_line: 102
+//   categories:
+//     - Filtering
+//     - IO
+//     - Core Transforms
+//   complexity: BASIC
+//   tags:
+//     - filter
+//     - bigquery
+//     - strings
+
+/**
+ * An example that reads the public samples of weather data from BigQuery, 
counts the number of
+ * tornadoes that occur in each month, and writes the results to BigQuery.
+ *
+ * <p>Concepts: Reading/writing BigQuery; counting a PCollection; user-defined 
PTransforms
+ *
+ * <p>The BigQuery input is taken from {@code 
clouddataflow-readonly:samples.weather_stations}
+ */
+public class MinimalBigQueryTornadoes {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MinimalBigQueryTornadoes.class);
+
+  // Use a 1000 row subset of the public weather station table 
publicdata:samples.gsod.
+  private static final String WEATHER_SAMPLES_TABLE =
+      "clouddataflow-readonly:samples.weather_stations";
+
+  /**
+   * Examines each row in the input table. If a tornado was recorded in that 
sample, the month in
+   * which it occurred is output.
+   */
+  static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
+      if ((Boolean) row.get("tornado")) {
+        c.output(Integer.parseInt((String) row.get("month")));
+      }
+    }
+  }
+
+  /**
+   * Prepares the data for writing to BigQuery by building a TableRow object 
containing an integer
+   * representation of month and the number of tornadoes that occurred in each 
month.
+   */
+  static class FormatCountsFn extends DoFn<KV<Integer, Long>, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getKey() + ": " + c.element().getValue());
+    }
+  }
+
+  public static void applyBigQueryTornadoes(Pipeline p) {
+    TypedRead<TableRow> bigqueryIO =
+        BigQueryIO.readTableRows()
+            .from(WEATHER_SAMPLES_TABLE)
+            .withMethod(TypedRead.Method.DIRECT_READ)
+            .withSelectedFields(Lists.newArrayList("month", "tornado"));
+
+    PCollection<TableRow> rowsFromBigQuery = p.apply(bigqueryIO);
+
+    rowsFromBigQuery
+        // Extract rows which include information on tornadoes per month.
+        .apply(ParDo.of(new ExtractTornadoesFn()))
+        // Count the number of times each month appears in the data.
+        .apply(Count.perElement())
+        // Format each month and count into a printable string.
+        .apply(ParDo.of(new FormatCountsFn()))
+        // Write the formatted results to the log.
+        .apply(ParDo.of(new LogOutput<>("Result: ")))
+        // Write the formatted results to a file.
+        .apply(TextIO.write().to("tornadoes"));
+  }
+
+  public static void runBigQueryTornadoes(PipelineOptions options) {
+    Pipeline p = Pipeline.create(options);
+    applyBigQueryTornadoes(p);
+    p.run().waitUntilFinish();
+  }
+
+  public static void main(String[] args) {
+    PipelineOptions options =
+        
PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
+    runBigQueryTornadoes(options);
+  }
+
+  static class LogOutput<T> extends DoFn<T, T> {
+    private final String prefix;
+
+    LogOutput(String prefix) {
+      this.prefix = prefix;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      LOG.info(prefix + c.element());
+      c.output(c.element());
+    }
+  }
+}
diff --git 
a/examples/java/src/test/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoesTest.java
 
b/examples/java/src/test/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoesTest.java
new file mode 100644
index 00000000000..6591f03adb3
--- /dev/null
+++ 
b/examples/java/src/test/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoesTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableRow;
+import 
org.apache.beam.examples.cookbook.MinimalBigQueryTornadoes.ExtractTornadoesFn;
+import 
org.apache.beam.examples.cookbook.MinimalBigQueryTornadoes.FormatCountsFn;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test case for {@link MinimalBigQueryTornadoes}. */
+@RunWith(JUnit4.class)
+public class MinimalBigQueryTornadoesTest {
+  @Rule public TestPipeline p = TestPipeline.create();
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testExtractTornadoes() {
+    TableRow row = new TableRow().set("month", "6").set("tornado", true);
+    PCollection<TableRow> input = p.apply(Create.of(ImmutableList.of(row)));
+    PCollection<Integer> result = input.apply(ParDo.of(new 
ExtractTornadoesFn()));
+    PAssert.that(result).containsInAnyOrder(6);
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testNoTornadoes() {
+    TableRow row = new TableRow().set("month", 6).set("tornado", false);
+    PCollection<TableRow> inputs = p.apply(Create.of(ImmutableList.of(row)));
+    PCollection<Integer> result = inputs.apply(ParDo.of(new 
ExtractTornadoesFn()));
+    PAssert.that(result).empty();
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testEmpty() {
+    PCollection<KV<Integer, Long>> inputs =
+        p.apply(Create.empty(new TypeDescriptor<KV<Integer, Long>>() {}));
+    PCollection<String> result = inputs.apply(ParDo.of(new FormatCountsFn()));
+    PAssert.that(result).empty();
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testFormatCounts() {
+    PCollection<KV<Integer, Long>> inputs =
+        p.apply(Create.of(KV.of(3, 0L), KV.of(4, Long.MAX_VALUE), KV.of(5, 
Long.MIN_VALUE)));
+    PCollection<String> result = inputs.apply(ParDo.of(new FormatCountsFn()));
+    PAssert.that(result).containsInAnyOrder("3: 0", "4: " + Long.MAX_VALUE, 
"5: " + Long.MIN_VALUE);
+    p.run().waitUntilFinish();
+  }
+}
diff --git a/playground/terraform/infrastructure/setup/iam.tf 
b/playground/terraform/infrastructure/setup/iam.tf
index ff7cc5cfda8..2f38b238401 100644
--- a/playground/terraform/infrastructure/setup/iam.tf
+++ b/playground/terraform/infrastructure/setup/iam.tf
@@ -29,7 +29,7 @@ resource "google_service_account" 
"playground_service_account_cf" {
 
 resource "google_project_iam_member" "terraform_service_account_roles" {
   for_each = toset([
-     "roles/container.nodeServiceAccount", "roles/datastore.viewer", 
"roles/artifactregistry.reader", "roles/logging.logWriter", 
"roles/monitoring.metricWriter", "roles/stackdriver.resourceMetadata.writer",
+     "roles/container.nodeServiceAccount", "roles/datastore.viewer", 
"roles/artifactregistry.reader", "roles/logging.logWriter", 
"roles/monitoring.metricWriter", "roles/stackdriver.resourceMetadata.writer", 
"roles/bigquery.readSessionUser", "roles/bigquery.dataViewer",
   ])
   role    = each.value
   member  = 
"serviceAccount:${google_service_account.playground_service_account.email}"

Reply via email to