This is an automated email from the ASF dual-hosted git repository. danoliveira pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 6fe157a [BEAM-13732] Add example for Go BigQuery IO wrapper. (#16786) 6fe157a is described below commit 6fe157aa3857dc44fc042bdb2d45a9fbf2dbaf1b Author: Daniel Oliveira <daniel.o.program...@gmail.com> AuthorDate: Tue Feb 8 17:42:59 2022 -0800 [BEAM-13732] Add example for Go BigQuery IO wrapper. (#16786) Also does some polish and fixup to the Kafka taxi example to fix little nits that were noticed while adding this new example. --- sdks/go/examples/kafka/taxi.go | 47 +++--- sdks/go/examples/xlang/bigquery/wordcount.go | 220 +++++++++++++++++++++++++++ 2 files changed, 245 insertions(+), 22 deletions(-) diff --git a/sdks/go/examples/kafka/taxi.go b/sdks/go/examples/kafka/taxi.go index 2c9c3c2..aeb539b 100644 --- a/sdks/go/examples/kafka/taxi.go +++ b/sdks/go/examples/kafka/taxi.go @@ -22,7 +22,9 @@ // // Running this example requires a Kafka cluster accessible to the runner, and // a cross-language expansion service that can expand Kafka read and write -// transforms. +// transforms. An address to a persistent expansion service can be provided as +// a flag, or if none is specified then the SDK will attempt to automatically +// start an appropriate expansion service. // // Setting Up a Kafka Cluster // @@ -34,22 +36,24 @@ // // Running an Expansion Server // -// These instructions will cover running the Java IO Expansion Service, and -// therefore requires a JDK installation in a version supported by Beam. -// Depending on whether you are running this from a numbered Beam release, or a -// development environment, there are two sources you may use for the Expansion -// service. +// If the automatic expansion service functionality is not available for your +// environment, or if you want improved performance, you will need to start a +// persistent expansion service. These instructions will cover running the Java +// IO Expansion Service, and therefore requires a JDK installation in a version +// supported by Beam. Depending on whether you are running this from a numbered +// Beam release, or a development environment, there are two sources you may +// use for the Expansion service. // // Numbered release: The expansion service jar is vendored as module -// org.apache.beam:beam-sdks-java-io-expansion-service in Maven Repository. -// This jar can be executed directly with the following command: +// org.apache.beam:beam-sdks-java-io-expansion-service in Maven Repository. +// This jar can be executed directly with the following command: // `java -jar <jar_name> <port_number>` // Development env: This requires that the JAVA_HOME environment variable -// points to your JDK installation. From the root `beam/` directory of the -// Apache Beam repository, the jar can be built (or built and run) with the -// following commands: -// Build: ./gradlew :sdks:java:io:expansion-service:build -// Build and Run: ./gradlew :sdks:java:io:expansion-service:runExpansionService -PconstructionService.port=<port_num> +// points to your JDK installation. From the root `beam/` directory of the +// Apache Beam repository, the jar can be built (or built and run) with the +// following commands: +// Build: ./gradlew :sdks:java:io:expansion-service:build +// Build and Run: ./gradlew :sdks:java:io:expansion-service:runExpansionService -PconstructionService.port=<port_num> // // Running the Example on GCP // @@ -64,7 +68,7 @@ // export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`" // export BOOTSTRAP_SERVERS="123.45.67.89:1234" // export EXPANSION_ADDR="localhost:1234" -// go run ./sdks/go/examples/kafka/types/types.go \ +// go run ./sdks/go/examples/kafka/taxi.go \ // --runner=DataflowRunner \ // --temp_location=$TEMP_LOCATION \ // --staging_location=$STAGING_LOCATION \ @@ -72,7 +76,6 @@ // --region=$REGION \ // --job_name="${JOB_NAME}" \ // --bootstrap_servers=$BOOTSTRAP_SERVER \ -// --experiments=use_portable_job_submission,use_runner_v2 \ // --expansion_addr=$EXPANSION_ADDR // // Running the Example From a Git Clone @@ -91,9 +94,11 @@ // accessible locally. // // Additionally, you must provide the location of your custom container to the -// pipeline with the --sdk_harness_container_image_override flag. For example: +// pipeline with the --sdk_harness_container_image_override flag for Java, or +// --environment_config flag for Go. For example: // -// --sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest" +// --sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest" \ +// --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest package main import ( @@ -111,9 +116,10 @@ import ( ) var ( - expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service") + expansionAddr = flag.String("expansion_addr", "", + "Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.") bootstrapServers = flag.String("bootstrap_servers", "", - "URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.") + "(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.") topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.") ) @@ -139,9 +145,6 @@ func main() { beam.Init() ctx := context.Background() - if *expansionAddr == "" { - log.Fatal(ctx, "No expansion address provided") - } p := beam.NewPipeline() s := p.Root() diff --git a/sdks/go/examples/xlang/bigquery/wordcount.go b/sdks/go/examples/xlang/bigquery/wordcount.go new file mode 100644 index 0000000..9500d2c --- /dev/null +++ b/sdks/go/examples/xlang/bigquery/wordcount.go @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Wordcount is an example using cross-language BigQuery transforms to read and write to BigQuery. +// This example runs a batch pipeline that reads from the public table "shakespeare" described here: +// https://cloud.google.com/bigquery/public-data#sample_tables. It reads the data of word counts per +// different work, aggregates them to find total word counts in all works, as well as the average +// number of times a word appears if it appears in a work, and then writes all that data to a given +// output table. +// +// This example is only expected to work on Dataflow, and requires a cross-language expansion +// service that can expand BigQuery read and write transforms. An address to a persistent expansion +// service can be provided as a flag, or if none is specified then the SDK will attempt to +// automatically start an appropriate expansion service. +// +// Running an Expansion Server +// +// If the automatic expansion service functionality is not available for your environment, or if +// you want improved performance, you will need to start a persistent expansion service. These +// instructions will cover running the Java SchemaIO Expansion Service, and therefore requires a JDK +// installation in a version supported by Beam. Depending on whether you are running this from a +// numbered Beam release, or a development environment, there are two sources you may use for the +// Expansion service. +// +// Numbered release: The expansion service jar is vendored as module +// org.apache.beam:beam-sdks-java-io-google-cloud-platform-expansion-service in Maven Repository. +// This jar can be executed directly with the following command: +// `java -jar <jar_name> <port_number>` +// +// Development env: This requires that the JAVA_HOME environment variable points to your JDK +// installation. From the root `beam/` directory of the Apache Beam repository, the jar can be +// built (or built and run) with the following commands: +// ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build +// ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:runExpansionService -PconstructionService.port=<port_num> +// +// Running the Example on GCP +// +// An example command for executing this pipeline on GCP is as follows: +// export PROJECT="$(gcloud config get-value project)" +// export TEMP_LOCATION="gs://MY-BUCKET/temp" +// export REGION="us-central1" +// export JOB_NAME="bigquery-wordcount-`date +%Y%m%d-%H%M%S`" +// export OUTPUT_TABLE="123.45.67.89:1234" +// export EXPANSION_ADDR="localhost:1234" +// export OUTPUT_TABLE="project_id:dataset_id.table_id" +// go run ./sdks/go/examples/kafka/types/types.go \ +// --runner=DataflowRunner \ +// --temp_location=$TEMP_LOCATION \ +// --staging_location=$STAGING_LOCATION \ +// --project=$PROJECT \ +// --region=$REGION \ +// --job_name="${JOB_NAME}" \ +// --bootstrap_servers=$BOOTSTRAP_SERVER \ +// --expansion_addr=$EXPANSION_ADDR \ +// --out_table=$OUTPUT_TABLE +// +// Running the Example From a Git Clone +// +// When running on a development environment, a custom container will likely need to be provided +// for the cross-language SDK. First this will require building and pushing the SDK container to +// container repository, such as Docker Hub. +// +// export DOCKER_ROOT="Your Docker Repository Root" +// ./gradlew :sdks:java:container:java8:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest +// docker push $DOCKER_ROOT/beam_java8_sdk:latest +// +// For runners in local mode, simply building the container using the default values for +// docker-repository-root and docker-tag will work to have it accessible locally. +// +// Additionally, you must provide the location of your custom container to the pipeline with the +// --sdk_harness_container_image_override flag for Java, or --environment_config flag for Go. For +// example: +// +// --sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest" \ +// --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest +package main + +import ( + "context" + "flag" + "math" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" +) + +var ( + // Set this to the address of the expansion service to use for BigQuery read and write, or leave + // unspecified to attempt to automatically start an expansion service. + expansionAddr = flag.String("expansion_addr", "", + "Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.") + // Set this required option to specify where to write the output. If the table does not exist, + // a new one will be created. If the table already exists, elements will be appended to it. + outTable = flag.String("out_table", "", "Output table (required).") +) + +func init() { + beam.RegisterType(reflect.TypeOf((*ShakesRow)(nil))) + beam.RegisterType(reflect.TypeOf((*WordsCombine)(nil))) + beam.RegisterType(reflect.TypeOf((*CountsRow)(nil))) + beam.RegisterType(reflect.TypeOf((*WordsAccum)(nil))) +} + +// ShakesRow is a struct corresponding to the schema of the Shakespeare input table. In order to +// be read properly, field names must match names from the BigQuery table, so some fields must +// include underlines. +type ShakesRow struct { + Word string `beam:"word"` + Word_count int64 `beam:"word_count"` + Corpus string `beam:"corpus"` + Corpus_date int64 `beam:"corpus_date"` +} + +// CountsRow is a struct corresponding to the schema of the output table. For writes, field names +// are derived from the Beam schema names specified below as struct tags. +type CountsRow struct { + // Word is the word being counted. + Word string `beam:"word"` + // WordCount is the count of how many times the word appears in all works combined. + WordCount int64 `beam:"word_count"` + // CorpusCount is the count of how many works the word appears in. + CorpusCount int64 `beam:"corpus_count"` + // AvgCount is the average number of times a word appears in all works that it appears in. In + // other words, this is equivalent to WordCount divided by CorpusCount. + AvgCount float64 `beam:"avg_count"` +} + +// WordsAccum is an accumulator for combining Shakespeare word counts in order to get averages of +// word counts. +type WordsAccum struct { + // Word is the word being counted. + Word string + // Count is the number of times this word has appeared, or in other words the number of corpuses + // it appears in (assuming that the input never repeats a word and corpus pair. + Count int64 + // Sum is the sum of word counts from inputs. + Sum int64 +} + +// WordsCombine is a CombineFn that adds up word counts and calculates average number of counts. +type WordsCombine struct{} + +// CreateAccumulator creates a default WordsAccum. +func (fn *WordsCombine) CreateAccumulator() WordsAccum { + return WordsAccum{} +} + +// AddInput sums up word counts and increments the corpus count. +func (fn *WordsCombine) AddInput(a WordsAccum, row ShakesRow) WordsAccum { + a.Word = row.Word + a.Count += 1 + a.Sum += row.Word_count + return a +} + +// MergeAccumulators sums up the various counts being accumulated. +func (fn *WordsCombine) MergeAccumulators(a, v WordsAccum) WordsAccum { + return WordsAccum{Word: a.Word, Count: a.Count + v.Count, Sum: a.Sum + v.Sum} +} + +// ExtractOutput calculates the average and fills out the output rows. +func (fn *WordsCombine) ExtractOutput(a WordsAccum) CountsRow { + row := CountsRow{ + Word: a.Word, + WordCount: a.Sum, + CorpusCount: a.Count, + } + if a.Count == 0 { + row.AvgCount = math.NaN() + } else { + row.AvgCount = float64(a.Sum) / float64(a.Count) + } + return row +} + +func main() { + flag.Parse() + beam.Init() + + p := beam.NewPipeline() + s := p.Root() + + // Read from the public BigQuery table. + inType := reflect.TypeOf((*ShakesRow)(nil)).Elem() + rows := bigqueryio.Read(s, inType, + bigqueryio.FromTable("bigquery-public-data:samples.shakespeare"), + bigqueryio.ReadExpansionAddr(*expansionAddr)) + + // Combine the data per word. + keyed := beam.ParDo(s, func(elm ShakesRow) (string, ShakesRow) { + return elm.Word, elm + }, rows) + counts := beam.CombinePerKey(s, &WordsCombine{}, keyed) + countVals := beam.DropKey(s, counts) + + // Write the data to the given BigQuery table destination, creating the table if needed. + bigqueryio.Write(s, *outTable, countVals, + bigqueryio.CreateDisposition(bigqueryio.CreateIfNeeded), + bigqueryio.WriteExpansionAddr(*expansionAddr)) + + ctx := context.Background() + if err := beamx.Run(ctx, p); err != nil { + log.Fatalf(ctx, "Failed to execute job: %v", err) + } +}