This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 833261f0a04 Add minimal example for BigQuery usage (#26713) 833261f0a04 is described below commit 833261f0a041e04ca4dcb8719424fda09edcc5e6 Author: Timur Sultanov <timur.sulta...@akvelon.com> AuthorDate: Tue Jun 6 23:38:38 2023 +0400 Add minimal example for BigQuery usage (#26713) * Add minimal example for BigQuery usage * Add project name to default pipeline options * Add role for Service account - BigQuery * BigQuery DataViewer role added * Update MinimalBigQueryTornadoes.java --------- Co-authored-by: Sergey Makarkin <sergey.makar...@akvelon.com> --- .../cookbook/MinimalBigQueryTornadoes.java | 142 +++++++++++++++++++++ .../cookbook/MinimalBigQueryTornadoesTest.java | 82 ++++++++++++ playground/terraform/infrastructure/setup/iam.tf | 2 +- 3 files changed, 225 insertions(+), 1 deletion(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java new file mode 100644 index 00000000000..43654ea4a05 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.cookbook; + +import com.google.api.services.bigquery.model.TableRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// beam-playground: +// name: MinimalBigQueryTornadoes +// description: An example that reads the public samples of weather data from BigQuery. +// multifile: false +// never_run: true +// always_run: true +// default_example: false +// pipeline_options: --project apache-beam-testing +// context_line: 102 +// categories: +// - Filtering +// - IO +// - Core Transforms +// complexity: BASIC +// tags: +// - filter +// - bigquery +// - strings + +/** + * An example that reads the public samples of weather data from BigQuery, counts the number of + * tornadoes that occur in each month, and writes the results to BigQuery. + * + * <p>Concepts: Reading/writing BigQuery; counting a PCollection; user-defined PTransforms + * + * <p>The BigQuery input is taken from {@code clouddataflow-readonly:samples.weather_stations} + */ +public class MinimalBigQueryTornadoes { + private static final Logger LOG = LoggerFactory.getLogger(MinimalBigQueryTornadoes.class); + + // Use a 1000 row subset of the public weather station table publicdata:samples.gsod. + private static final String WEATHER_SAMPLES_TABLE = + "clouddataflow-readonly:samples.weather_stations"; + + /** + * Examines each row in the input table. If a tornado was recorded in that sample, the month in + * which it occurred is output. + */ + static class ExtractTornadoesFn extends DoFn<TableRow, Integer> { + @ProcessElement + public void processElement(ProcessContext c) { + TableRow row = c.element(); + if ((Boolean) row.get("tornado")) { + c.output(Integer.parseInt((String) row.get("month"))); + } + } + } + + /** + * Prepares the data for writing to BigQuery by building a TableRow object containing an integer + * representation of month and the number of tornadoes that occurred in each month. + */ + static class FormatCountsFn extends DoFn<KV<Integer, Long>, String> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().getKey() + ": " + c.element().getValue()); + } + } + + public static void applyBigQueryTornadoes(Pipeline p) { + TypedRead<TableRow> bigqueryIO = + BigQueryIO.readTableRows() + .from(WEATHER_SAMPLES_TABLE) + .withMethod(TypedRead.Method.DIRECT_READ) + .withSelectedFields(Lists.newArrayList("month", "tornado")); + + PCollection<TableRow> rowsFromBigQuery = p.apply(bigqueryIO); + + rowsFromBigQuery + // Extract rows which include information on tornadoes per month. + .apply(ParDo.of(new ExtractTornadoesFn())) + // Count the number of times each month appears in the data. + .apply(Count.perElement()) + // Format each month and count into a printable string. + .apply(ParDo.of(new FormatCountsFn())) + // Write the formatted results to the log. + .apply(ParDo.of(new LogOutput<>("Result: "))) + // Write the formatted results to a file. + .apply(TextIO.write().to("tornadoes")); + } + + public static void runBigQueryTornadoes(PipelineOptions options) { + Pipeline p = Pipeline.create(options); + applyBigQueryTornadoes(p); + p.run().waitUntilFinish(); + } + + public static void main(String[] args) { + PipelineOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); + runBigQueryTornadoes(options); + } + + static class LogOutput<T> extends DoFn<T, T> { + private final String prefix; + + LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) { + LOG.info(prefix + c.element()); + c.output(c.element()); + } + } +} diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoesTest.java new file mode 100644 index 00000000000..6591f03adb3 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoesTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.cookbook; + +import com.google.api.services.bigquery.model.TableRow; +import org.apache.beam.examples.cookbook.MinimalBigQueryTornadoes.ExtractTornadoesFn; +import org.apache.beam.examples.cookbook.MinimalBigQueryTornadoes.FormatCountsFn; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test case for {@link MinimalBigQueryTornadoes}. */ +@RunWith(JUnit4.class) +public class MinimalBigQueryTornadoesTest { + @Rule public TestPipeline p = TestPipeline.create(); + + @Test + @Category(ValidatesRunner.class) + public void testExtractTornadoes() { + TableRow row = new TableRow().set("month", "6").set("tornado", true); + PCollection<TableRow> input = p.apply(Create.of(ImmutableList.of(row))); + PCollection<Integer> result = input.apply(ParDo.of(new ExtractTornadoesFn())); + PAssert.that(result).containsInAnyOrder(6); + p.run().waitUntilFinish(); + } + + @Test + @Category(ValidatesRunner.class) + public void testNoTornadoes() { + TableRow row = new TableRow().set("month", 6).set("tornado", false); + PCollection<TableRow> inputs = p.apply(Create.of(ImmutableList.of(row))); + PCollection<Integer> result = inputs.apply(ParDo.of(new ExtractTornadoesFn())); + PAssert.that(result).empty(); + p.run().waitUntilFinish(); + } + + @Test + @Category(ValidatesRunner.class) + public void testEmpty() { + PCollection<KV<Integer, Long>> inputs = + p.apply(Create.empty(new TypeDescriptor<KV<Integer, Long>>() {})); + PCollection<String> result = inputs.apply(ParDo.of(new FormatCountsFn())); + PAssert.that(result).empty(); + p.run().waitUntilFinish(); + } + + @Test + @Category(ValidatesRunner.class) + public void testFormatCounts() { + PCollection<KV<Integer, Long>> inputs = + p.apply(Create.of(KV.of(3, 0L), KV.of(4, Long.MAX_VALUE), KV.of(5, Long.MIN_VALUE))); + PCollection<String> result = inputs.apply(ParDo.of(new FormatCountsFn())); + PAssert.that(result).containsInAnyOrder("3: 0", "4: " + Long.MAX_VALUE, "5: " + Long.MIN_VALUE); + p.run().waitUntilFinish(); + } +} diff --git a/playground/terraform/infrastructure/setup/iam.tf b/playground/terraform/infrastructure/setup/iam.tf index ff7cc5cfda8..2f38b238401 100644 --- a/playground/terraform/infrastructure/setup/iam.tf +++ b/playground/terraform/infrastructure/setup/iam.tf @@ -29,7 +29,7 @@ resource "google_service_account" "playground_service_account_cf" { resource "google_project_iam_member" "terraform_service_account_roles" { for_each = toset([ - "roles/container.nodeServiceAccount", "roles/datastore.viewer", "roles/artifactregistry.reader", "roles/logging.logWriter", "roles/monitoring.metricWriter", "roles/stackdriver.resourceMetadata.writer", + "roles/container.nodeServiceAccount", "roles/datastore.viewer", "roles/artifactregistry.reader", "roles/logging.logWriter", "roles/monitoring.metricWriter", "roles/stackdriver.resourceMetadata.writer", "roles/bigquery.readSessionUser", "roles/bigquery.dataViewer", ]) role = each.value member = "serviceAccount:${google_service_account.playground_service_account.email}"