This is an automated email from the ASF dual-hosted git repository.

danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fe157a  [BEAM-13732] Add example for Go BigQuery IO wrapper. (#16786)
6fe157a is described below

commit 6fe157aa3857dc44fc042bdb2d45a9fbf2dbaf1b
Author: Daniel Oliveira <daniel.o.program...@gmail.com>
AuthorDate: Tue Feb 8 17:42:59 2022 -0800

    [BEAM-13732] Add example for Go BigQuery IO wrapper. (#16786)
    
    Also does some polish and fixup to the Kafka taxi example to fix little 
nits that were noticed while adding this new example.
---
 sdks/go/examples/kafka/taxi.go               |  47 +++---
 sdks/go/examples/xlang/bigquery/wordcount.go | 220 +++++++++++++++++++++++++++
 2 files changed, 245 insertions(+), 22 deletions(-)

diff --git a/sdks/go/examples/kafka/taxi.go b/sdks/go/examples/kafka/taxi.go
index 2c9c3c2..aeb539b 100644
--- a/sdks/go/examples/kafka/taxi.go
+++ b/sdks/go/examples/kafka/taxi.go
@@ -22,7 +22,9 @@
 //
 // Running this example requires a Kafka cluster accessible to the runner, and
 // a cross-language expansion service that can expand Kafka read and write
-// transforms.
+// transforms. An address to a persistent expansion service can be provided as
+// a flag, or if none is specified then the SDK will attempt to automatically
+// start an appropriate expansion service.
 //
 // Setting Up a Kafka Cluster
 //
@@ -34,22 +36,24 @@
 //
 // Running an Expansion Server
 //
-// These instructions will cover running the Java IO Expansion Service, and
-// therefore requires a JDK installation in a version supported by Beam.
-// Depending on whether you are running this from a numbered Beam release, or a
-// development environment, there are two sources you may use for the Expansion
-// service.
+// If the automatic expansion service functionality is not available for your
+// environment, or if you want improved performance, you will need to start a
+// persistent expansion service. These instructions will cover running the Java
+// IO Expansion Service, and therefore requires a JDK installation in a version
+// supported by Beam. Depending on whether you are running this from a numbered
+// Beam release, or a development environment, there are two sources you may
+// use for the Expansion service.
 //
 // Numbered release: The expansion service jar is vendored as module
-//   org.apache.beam:beam-sdks-java-io-expansion-service in Maven Repository.
-//   This jar can be executed directly with the following command:
+// org.apache.beam:beam-sdks-java-io-expansion-service in Maven Repository.
+// This jar can be executed directly with the following command:
 //   `java -jar <jar_name> <port_number>`
 // Development env: This requires that the JAVA_HOME environment variable
-//   points to your JDK installation. From the root `beam/` directory of the
-//   Apache Beam repository, the jar can be built (or built and run) with the
-//   following commands:
-//     Build: ./gradlew :sdks:java:io:expansion-service:build
-//     Build and Run: ./gradlew 
:sdks:java:io:expansion-service:runExpansionService 
-PconstructionService.port=<port_num>
+// points to your JDK installation. From the root `beam/` directory of the
+// Apache Beam repository, the jar can be built (or built and run) with the
+// following commands:
+//   Build: ./gradlew :sdks:java:io:expansion-service:build
+//   Build and Run: ./gradlew 
:sdks:java:io:expansion-service:runExpansionService 
-PconstructionService.port=<port_num>
 //
 // Running the Example on GCP
 //
@@ -64,7 +68,7 @@
 //   export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`"
 //   export BOOTSTRAP_SERVERS="123.45.67.89:1234"
 //   export EXPANSION_ADDR="localhost:1234"
-//   go run ./sdks/go/examples/kafka/types/types.go \
+//   go run ./sdks/go/examples/kafka/taxi.go \
 //     --runner=DataflowRunner \
 //     --temp_location=$TEMP_LOCATION \
 //     --staging_location=$STAGING_LOCATION \
@@ -72,7 +76,6 @@
 //     --region=$REGION \
 //     --job_name="${JOB_NAME}" \
 //     --bootstrap_servers=$BOOTSTRAP_SERVER \
-//     --experiments=use_portable_job_submission,use_runner_v2 \
 //     --expansion_addr=$EXPANSION_ADDR
 //
 // Running the Example From a Git Clone
@@ -91,9 +94,11 @@
 // accessible locally.
 //
 // Additionally, you must provide the location of your custom container to the
-// pipeline with the --sdk_harness_container_image_override flag. For example:
+// pipeline with the --sdk_harness_container_image_override flag for Java, or
+// --environment_config flag for Go. For example:
 //
-//   
--sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest"
+//   
--sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest"
 \
+//   --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest
 package main
 
 import (
@@ -111,9 +116,10 @@ import (
 )
 
 var (
-       expansionAddr    = flag.String("expansion_addr", "", "Address of 
Expansion Service")
+       expansionAddr = flag.String("expansion_addr", "",
+               "Address of Expansion Service. If not specified, attempts to 
automatically start an appropriate expansion service.")
        bootstrapServers = flag.String("bootstrap_servers", "",
-               "URL of the bootstrap servers for the Kafka cluster. Should be 
accessible by the runner.")
+               "(Required) URL of the bootstrap servers for the Kafka cluster. 
Should be accessible by the runner.")
        topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic 
to write to and read from.")
 )
 
@@ -139,9 +145,6 @@ func main() {
        beam.Init()
 
        ctx := context.Background()
-       if *expansionAddr == "" {
-               log.Fatal(ctx, "No expansion address provided")
-       }
 
        p := beam.NewPipeline()
        s := p.Root()
diff --git a/sdks/go/examples/xlang/bigquery/wordcount.go 
b/sdks/go/examples/xlang/bigquery/wordcount.go
new file mode 100644
index 0000000..9500d2c
--- /dev/null
+++ b/sdks/go/examples/xlang/bigquery/wordcount.go
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Wordcount is an example using cross-language BigQuery transforms to read 
and write to BigQuery.
+// This example runs a batch pipeline that reads from the public table 
"shakespeare" described here:
+// https://cloud.google.com/bigquery/public-data#sample_tables. It reads the 
data of word counts per
+// different work, aggregates them to find total word counts in all works, as 
well as the average
+// number of times a word appears if it appears in a work, and then writes all 
that data to a given
+// output table.
+//
+// This example is only expected to work on Dataflow, and requires a 
cross-language expansion
+// service that can expand BigQuery read and write transforms. An address to a 
persistent expansion
+// service can be provided as a flag, or if none is specified then the SDK 
will attempt to
+// automatically start an appropriate expansion service.
+//
+// Running an Expansion Server
+//
+// If the automatic expansion service functionality is not available for your 
environment, or if
+// you want improved performance, you will need to start a persistent 
expansion service. These
+// instructions will cover running the Java SchemaIO Expansion Service, and 
therefore requires a JDK
+// installation in a version supported by Beam. Depending on whether you are 
running this from a
+// numbered Beam release, or a development environment, there are two sources 
you may use for the
+// Expansion service.
+//
+// Numbered release: The expansion service jar is vendored as module
+// org.apache.beam:beam-sdks-java-io-google-cloud-platform-expansion-service 
in Maven Repository.
+// This jar can be executed directly with the following command:
+//   `java -jar <jar_name> <port_number>`
+//
+// Development env: This requires that the JAVA_HOME environment variable 
points to your JDK
+// installation. From the root `beam/` directory of the Apache Beam 
repository, the jar can be
+// built (or built and run) with the following commands:
+//   ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build
+//   ./gradlew 
:sdks:java:io:google-cloud-platform:expansion-service:runExpansionService 
-PconstructionService.port=<port_num>
+//
+// Running the Example on GCP
+//
+// An example command for executing this pipeline on GCP is as follows:
+//   export PROJECT="$(gcloud config get-value project)"
+//   export TEMP_LOCATION="gs://MY-BUCKET/temp"
+//   export REGION="us-central1"
+//   export JOB_NAME="bigquery-wordcount-`date +%Y%m%d-%H%M%S`"
+//   export OUTPUT_TABLE="123.45.67.89:1234"
+//   export EXPANSION_ADDR="localhost:1234"
+//   export OUTPUT_TABLE="project_id:dataset_id.table_id"
+//   go run ./sdks/go/examples/kafka/types/types.go \
+//     --runner=DataflowRunner \
+//     --temp_location=$TEMP_LOCATION \
+//     --staging_location=$STAGING_LOCATION \
+//     --project=$PROJECT \
+//     --region=$REGION \
+//     --job_name="${JOB_NAME}" \
+//     --bootstrap_servers=$BOOTSTRAP_SERVER \
+//     --expansion_addr=$EXPANSION_ADDR \
+//     --out_table=$OUTPUT_TABLE
+//
+// Running the Example From a Git Clone
+//
+// When running on a development environment, a custom container will likely 
need to be provided
+// for the cross-language SDK. First this will require building and pushing 
the SDK container to
+// container repository, such as Docker Hub.
+//
+//   export DOCKER_ROOT="Your Docker Repository Root"
+//   ./gradlew :sdks:java:container:java8:docker 
-Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+//   docker push $DOCKER_ROOT/beam_java8_sdk:latest
+//
+// For runners in local mode, simply building the container using the default 
values for
+// docker-repository-root and docker-tag will work to have it accessible 
locally.
+//
+// Additionally, you must provide the location of your custom container to the 
pipeline with the
+// --sdk_harness_container_image_override flag for Java, or 
--environment_config flag for Go. For
+// example:
+//
+//   
--sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest"
 \
+//   --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest
+package main
+
+import (
+       "context"
+       "flag"
+       "math"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+)
+
+var (
+       // Set this to the address of the expansion service to use for BigQuery 
read and write, or leave
+       // unspecified to attempt to automatically start an expansion service.
+       expansionAddr = flag.String("expansion_addr", "",
+               "Address of Expansion Service. If not specified, attempts to 
automatically start an appropriate expansion service.")
+       // Set this required option to specify where to write the output. If 
the table does not exist,
+       // a new one will be created. If the table already exists, elements 
will be appended to it.
+       outTable = flag.String("out_table", "", "Output table (required).")
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*ShakesRow)(nil)))
+       beam.RegisterType(reflect.TypeOf((*WordsCombine)(nil)))
+       beam.RegisterType(reflect.TypeOf((*CountsRow)(nil)))
+       beam.RegisterType(reflect.TypeOf((*WordsAccum)(nil)))
+}
+
+// ShakesRow is a struct corresponding to the schema of the Shakespeare input 
table. In order to
+// be read properly, field names must match names from the BigQuery table, so 
some fields must
+// include underlines.
+type ShakesRow struct {
+       Word        string `beam:"word"`
+       Word_count  int64  `beam:"word_count"`
+       Corpus      string `beam:"corpus"`
+       Corpus_date int64  `beam:"corpus_date"`
+}
+
+// CountsRow is a struct corresponding to the schema of the output table. For 
writes, field names
+// are derived from the Beam schema names specified below as struct tags.
+type CountsRow struct {
+       // Word is the word being counted.
+       Word string `beam:"word"`
+       // WordCount is the count of how many times the word appears in all 
works combined.
+       WordCount int64 `beam:"word_count"`
+       // CorpusCount is the count of how many works the word appears in.
+       CorpusCount int64 `beam:"corpus_count"`
+       // AvgCount is the average number of times a word appears in all works 
that it appears in. In
+       // other words, this is equivalent to WordCount divided by CorpusCount.
+       AvgCount float64 `beam:"avg_count"`
+}
+
+// WordsAccum is an accumulator for combining Shakespeare word counts in order 
to get averages of
+// word counts.
+type WordsAccum struct {
+       // Word is the word being counted.
+       Word string
+       // Count is the number of times this word has appeared, or in other 
words the number of corpuses
+       // it appears in (assuming that the input never repeats a word and 
corpus pair.
+       Count int64
+       // Sum is the sum of word counts from inputs.
+       Sum int64
+}
+
+// WordsCombine is a CombineFn that adds up word counts and calculates average 
number of counts.
+type WordsCombine struct{}
+
+// CreateAccumulator creates a default WordsAccum.
+func (fn *WordsCombine) CreateAccumulator() WordsAccum {
+       return WordsAccum{}
+}
+
+// AddInput sums up word counts and increments the corpus count.
+func (fn *WordsCombine) AddInput(a WordsAccum, row ShakesRow) WordsAccum {
+       a.Word = row.Word
+       a.Count += 1
+       a.Sum += row.Word_count
+       return a
+}
+
+// MergeAccumulators sums up the various counts being accumulated.
+func (fn *WordsCombine) MergeAccumulators(a, v WordsAccum) WordsAccum {
+       return WordsAccum{Word: a.Word, Count: a.Count + v.Count, Sum: a.Sum + 
v.Sum}
+}
+
+// ExtractOutput calculates the average and fills out the output rows.
+func (fn *WordsCombine) ExtractOutput(a WordsAccum) CountsRow {
+       row := CountsRow{
+               Word:        a.Word,
+               WordCount:   a.Sum,
+               CorpusCount: a.Count,
+       }
+       if a.Count == 0 {
+               row.AvgCount = math.NaN()
+       } else {
+               row.AvgCount = float64(a.Sum) / float64(a.Count)
+       }
+       return row
+}
+
+func main() {
+       flag.Parse()
+       beam.Init()
+
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       // Read from the public BigQuery table.
+       inType := reflect.TypeOf((*ShakesRow)(nil)).Elem()
+       rows := bigqueryio.Read(s, inType,
+               
bigqueryio.FromTable("bigquery-public-data:samples.shakespeare"),
+               bigqueryio.ReadExpansionAddr(*expansionAddr))
+
+       // Combine the data per word.
+       keyed := beam.ParDo(s, func(elm ShakesRow) (string, ShakesRow) {
+               return elm.Word, elm
+       }, rows)
+       counts := beam.CombinePerKey(s, &WordsCombine{}, keyed)
+       countVals := beam.DropKey(s, counts)
+
+       // Write the data to the given BigQuery table destination, creating the 
table if needed.
+       bigqueryio.Write(s, *outTable, countVals,
+               bigqueryio.CreateDisposition(bigqueryio.CreateIfNeeded),
+               bigqueryio.WriteExpansionAddr(*expansionAddr))
+
+       ctx := context.Background()
+       if err := beamx.Run(ctx, p); err != nil {
+               log.Fatalf(ctx, "Failed to execute job: %v", err)
+       }
+}

Reply via email to