This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2039b39f96f [BEAM-14509] Add several flags to dataflow runner (#17752)
2039b39f96f is described below

commit 2039b39f96ff96e0899a830e560c02c6c8e17acb
Author: Danny McCormick <dannymccorm...@google.com>
AuthorDate: Wed May 25 15:23:08 2022 -0400

    [BEAM-14509] Add several flags to dataflow runner (#17752)
---
 sdks/go/pkg/beam/runners/dataflow/dataflow.go      | 112 +++++++++++++--------
 sdks/go/pkg/beam/runners/dataflow/dataflow_test.go |  39 ++++++-
 .../pkg/beam/runners/dataflow/dataflowlib/job.go   |  21 +++-
 3 files changed, 123 insertions(+), 49 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 33b4b0fc66d..eff46bca3f3 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -51,26 +51,32 @@ import (
 // TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs.
 
 var (
-       endpoint             = flag.String("dataflow_endpoint", "", "Dataflow 
endpoint (optional).")
-       stagingLocation      = flag.String("staging_location", "", "GCS staging 
location (required).")
-       image                = flag.String("worker_harness_container_image", 
"", "Worker harness container image (required).")
-       labels               = flag.String("labels", "", "JSON-formatted 
map[string]string of job labels (optional).")
-       serviceAccountEmail  = flag.String("service_account_email", "", 
"Service account email (optional).")
-       numWorkers           = flag.Int64("num_workers", 0, "Number of workers 
(optional).")
-       maxNumWorkers        = flag.Int64("max_num_workers", 0, "Maximum number 
of workers during scaling (optional).")
-       diskSizeGb           = flag.Int64("disk_size_gb", 0, "Size of root disk 
for VMs, in GB (optional).")
-       diskType             = flag.String("disk_type", "", "Type of root disk 
for VMs (optional).")
-       autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", 
"Autoscaling mode to use (optional).")
-       zone                 = flag.String("zone", "", "GCP zone (optional)")
-       network              = flag.String("network", "", "GCP network 
(optional)")
-       subnetwork           = flag.String("subnetwork", "", "GCP subnetwork 
(optional)")
-       noUsePublicIPs       = flag.Bool("no_use_public_ips", false, "Workers 
must not use public IP addresses (optional)")
-       tempLocation         = flag.String("temp_location", "", "Temp location 
(optional)")
-       machineType          = flag.String("worker_machine_type", "", "GCE 
machine type (optional)")
-       minCPUPlatform       = flag.String("min_cpu_platform", "", "GCE minimum 
cpu platform (optional)")
-       workerJar            = flag.String("dataflow_worker_jar", "", "Dataflow 
worker jar (optional)")
-       workerRegion         = flag.String("worker_region", "", "Dataflow 
worker region (optional)")
-       workerZone           = flag.String("worker_zone", "", "Dataflow worker 
zone (optional)")
+       endpoint               = flag.String("dataflow_endpoint", "", "Dataflow 
endpoint (optional).")
+       stagingLocation        = flag.String("staging_location", "", "GCS 
staging location (required).")
+       image                  = flag.String("worker_harness_container_image", 
"", "Worker harness container image (optional).")
+       labels                 = flag.String("labels", "", "JSON-formatted 
map[string]string of job labels (optional).")
+       serviceAccountEmail    = flag.String("service_account_email", "", 
"Service account email (optional).")
+       numWorkers             = flag.Int64("num_workers", 0, "Number of 
workers (optional).")
+       workerHarnessThreads   = flag.Int64("number_of_worker_harness_threads", 
0, "The number of threads per each worker harness process (optional).")
+       maxNumWorkers          = flag.Int64("max_num_workers", 0, "Maximum 
number of workers during scaling (optional).")
+       diskSizeGb             = flag.Int64("disk_size_gb", 0, "Size of root 
disk for VMs, in GB (optional).")
+       diskType               = flag.String("disk_type", "", "Type of root 
disk for VMs (optional).")
+       autoscalingAlgorithm   = flag.String("autoscaling_algorithm", "", 
"Autoscaling mode to use (optional).")
+       zone                   = flag.String("zone", "", "GCP zone (optional)")
+       kmsKey                 = flag.String("dataflow_kms_key", "", "The Cloud 
KMS key identifier used to encrypt data at rest (optional).")
+       network                = flag.String("network", "", "GCP network 
(optional)")
+       subnetwork             = flag.String("subnetwork", "", "GCP subnetwork 
(optional)")
+       noUsePublicIPs         = flag.Bool("no_use_public_ips", false, "Workers 
must not use public IP addresses (optional)")
+       tempLocation           = flag.String("temp_location", "", "Temp 
location (optional)")
+       machineType            = flag.String("worker_machine_type", "", "GCE 
machine type (optional)")
+       minCPUPlatform         = flag.String("min_cpu_platform", "", "GCE 
minimum cpu platform (optional)")
+       workerJar              = flag.String("dataflow_worker_jar", "", 
"Dataflow worker jar (optional)")
+       workerRegion           = flag.String("worker_region", "", "Dataflow 
worker region (optional)")
+       workerZone             = flag.String("worker_zone", "", "Dataflow 
worker zone (optional)")
+       dataflowServiceOptions = flag.String("dataflow_service_options", "", 
"Comma separated list of additional job modes and configurations (optional)")
+       flexRSGoal             = flag.String("flexrs_goal", "", "Which Flexible 
Resource Scheduling mode to run in (optional)")
+       // TODO(BEAM-14512) Turn this on once TO_STRING is implemented
+       // enableHotKeyLogging    = flag.Bool("enable_hot_key_logging", false, 
"Specifies that when a hot key is detected in the pipeline, the literal, 
human-readable key is printed in the user's Cloud Logging project (optional).")
 
        dryRun         = flag.Bool("dry_run", false, "Dry run. Just print the 
job, but don't submit it.")
        teardownPolicy = flag.String("teardown_policy", "", "Job teardown 
policy (internal only).")
@@ -242,6 +248,15 @@ func getJobOptions(ctx context.Context) 
(*dataflowlib.JobOptions, error) {
                }
        }
 
+       if *flexRSGoal != "" {
+               switch *flexRSGoal {
+               case "FLEXRS_UNSPECIFIED", "FLEXRS_SPEED_OPTIMIZED", 
"FLEXRS_COST_OPTIMIZED":
+                       // valid values
+               default:
+                       return nil, errors.Errorf("invalid flex resource 
scheduling goal. Got %q; Use 
--flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)",
 *flexRSGoal)
+               }
+       }
+
        hooks.SerializeHooksToOptions()
 
        experiments := jobopts.GetExperiments()
@@ -267,32 +282,41 @@ func getJobOptions(ctx context.Context) 
(*dataflowlib.JobOptions, error) {
                experiments = append(experiments, 
fmt.Sprintf("min_cpu_platform=%v", *minCPUPlatform))
        }
 
+       var dfServiceOptions []string
+       if *dataflowServiceOptions != "" {
+               dfServiceOptions = strings.Split(*dataflowServiceOptions, ",")
+       }
+
        beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
        opts := &dataflowlib.JobOptions{
-               Name:                jobopts.GetJobName(),
-               Experiments:         experiments,
-               Options:             beam.PipelineOptions.Export(),
-               Project:             project,
-               Region:              region,
-               Zone:                *zone,
-               Network:             *network,
-               Subnetwork:          *subnetwork,
-               NoUsePublicIPs:      *noUsePublicIPs,
-               NumWorkers:          *numWorkers,
-               MaxNumWorkers:       *maxNumWorkers,
-               DiskSizeGb:          *diskSizeGb,
-               DiskType:            *diskType,
-               Algorithm:           *autoscalingAlgorithm,
-               MachineType:         *machineType,
-               Labels:              jobLabels,
-               ServiceAccountEmail: *serviceAccountEmail,
-               TempLocation:        *tempLocation,
-               Worker:              *jobopts.WorkerBinary,
-               WorkerJar:           *workerJar,
-               WorkerRegion:        *workerRegion,
-               WorkerZone:          *workerZone,
-               TeardownPolicy:      *teardownPolicy,
-               ContainerImage:      getContainerImage(ctx),
+               Name:                   jobopts.GetJobName(),
+               Experiments:            experiments,
+               DataflowServiceOptions: dfServiceOptions,
+               Options:                beam.PipelineOptions.Export(),
+               Project:                project,
+               Region:                 region,
+               Zone:                   *zone,
+               KmsKey:                 *kmsKey,
+               Network:                *network,
+               Subnetwork:             *subnetwork,
+               NoUsePublicIPs:         *noUsePublicIPs,
+               NumWorkers:             *numWorkers,
+               MaxNumWorkers:          *maxNumWorkers,
+               WorkerHarnessThreads:   *workerHarnessThreads,
+               DiskSizeGb:             *diskSizeGb,
+               DiskType:               *diskType,
+               Algorithm:              *autoscalingAlgorithm,
+               FlexRSGoal:             *flexRSGoal,
+               MachineType:            *machineType,
+               Labels:                 jobLabels,
+               ServiceAccountEmail:    *serviceAccountEmail,
+               TempLocation:           *tempLocation,
+               Worker:                 *jobopts.WorkerBinary,
+               WorkerJar:              *workerJar,
+               WorkerRegion:           *workerRegion,
+               WorkerZone:             *workerZone,
+               TeardownPolicy:         *teardownPolicy,
+               ContainerImage:         getContainerImage(ctx),
        }
        if opts.TempLocation == "" {
                opts.TempLocation = gcsx.Join(*stagingLocation, "tmp")
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
index 29118de855e..f536fc917ca 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
@@ -17,10 +17,11 @@ package dataflow
 
 import (
        "context"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
        "sort"
        "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
 )
 
 func TestDontUseFlagAsPipelineOption(t *testing.T) {
@@ -39,6 +40,8 @@ func TestGetJobOptions(t *testing.T) {
        *stagingLocation = "gs://testStagingLocation"
        *autoscalingAlgorithm = "NONE"
        *minCPUPlatform = "testPlatform"
+       *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+       *dataflowServiceOptions = "opt1,opt2"
 
        *gcpopts.Project = "testProject"
        *gcpopts.Region = "testRegion"
@@ -64,6 +67,17 @@ func TestGetJobOptions(t *testing.T) {
                        }
                }
        }
+       if got, want := len(opts.DataflowServiceOptions), 2; got != want {
+               t.Errorf("len(getJobOptions().DataflowServiceOptions) = %q, 
want %q", got, want)
+       } else {
+               sort.Strings(opts.DataflowServiceOptions)
+               expectedOptions := []string{"opt1", "opt2"}
+               for i := 0; i < 2; i++ {
+                       if got, want := opts.DataflowServiceOptions[i], 
expectedOptions[i]; got != want {
+                               
t.Errorf("getJobOptions().DataflowServiceOptions = %q, want %q", got, want)
+                       }
+               }
+       }
        if got, want := opts.Project, "testProject"; got != want {
                t.Errorf("getJobOptions().Project = %q, want %q", got, want)
        }
@@ -83,6 +97,9 @@ func TestGetJobOptions(t *testing.T) {
        if got, want := opts.TempLocation, "gs://testStagingLocation/tmp"; got 
!= want {
                t.Errorf("getJobOptions().TempLocation = %q, want %q", got, 
want)
        }
+       if got, want := opts.FlexRSGoal, "FLEXRS_SPEED_OPTIMIZED"; got != want {
+               t.Errorf("getJobOptions().FlexRSGoal = %q, want %q", got, want)
+       }
 }
 
 func TestGetJobOptions_NoExperimentsSet(t *testing.T) {
@@ -143,6 +160,24 @@ func TestGetJobOptions_InvalidAutoscaling(t *testing.T) {
        }
 }
 
+func TestGetJobOptions_InvalidRsGoal(t *testing.T) {
+       *labels = `{"label1": "val1", "label2": "val2"}`
+       *stagingLocation = "gs://testStagingLocation"
+       *flexRSGoal = "INVALID"
+       *minCPUPlatform = "testPlatform"
+
+       *gcpopts.Project = "testProject"
+       *gcpopts.Region = "testRegion"
+
+       *jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+       *jobopts.JobName = "testJob"
+
+       _, err := getJobOptions(context.Background())
+       if err == nil {
+               t.Fatalf("getJobOptions() returned error nil, want an error")
+       }
+}
+
 func TestGetJobOptions_DockerWithImage(t *testing.T) {
        *jobopts.EnvironmentType = "docker"
        *jobopts.EnvironmentConfig = "testContainerImage"
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 459c3f52a4b..fd24729ff9e 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -41,12 +41,15 @@ type JobOptions struct {
        Name string
        // Experiments are additional experiments.
        Experiments []string
+       // DataflowServiceOptions are additional job modes and configurations 
for Dataflow
+       DataflowServiceOptions []string
        // Pipeline options
        Options runtime.RawOptions
 
        Project             string
        Region              string
        Zone                string
+       KmsKey              string
        Network             string
        Subnetwork          string
        NoUsePublicIPs      bool
@@ -60,10 +63,13 @@ type JobOptions struct {
        WorkerZone          string
        ContainerImage      string
        ArtifactURLs        []string // Additional packages for workers.
+       FlexRSGoal          string
+       EnableHotKeyLogging bool
 
        // Autoscaling settings
-       Algorithm     string
-       MaxNumWorkers int64
+       Algorithm            string
+       MaxNumWorkers        int64
+       WorkerHarnessThreads int64
 
        TempLocation string
 
@@ -155,7 +161,11 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, 
opts *JobOptions, worker
                Name:      opts.Name,
                Type:      jobType,
                Environment: &df.Environment{
-                       ServiceAccountEmail: opts.ServiceAccountEmail,
+                       DebugOptions: &df.DebugOptions{
+                               EnableHotKeyLogging: opts.EnableHotKeyLogging,
+                       },
+                       FlexResourceSchedulingGoal: opts.FlexRSGoal,
+                       ServiceAccountEmail:        opts.ServiceAccountEmail,
                        UserAgent: newMsg(userAgent{
                                Name:    core.SdkName,
                                Version: core.SdkVersion,
@@ -174,6 +184,8 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, 
opts *JobOptions, worker
                                },
                                GoOptions: opts.Options,
                        }),
+                       ServiceOptions:    opts.DataflowServiceOptions,
+                       ServiceKmsKeyName: opts.KmsKey,
                        WorkerPools: []*df.WorkerPool{{
                                AutoscalingSettings: &df.AutoscalingSettings{
                                        MaxNumWorkers: opts.MaxNumWorkers,
@@ -205,6 +217,9 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, 
opts *JobOptions, worker
        if opts.NumWorkers > 0 {
                workerPool.NumWorkers = opts.NumWorkers
        }
+       if opts.WorkerHarnessThreads > 0 {
+               workerPool.NumThreadsPerWorker = opts.WorkerHarnessThreads
+       }
        if opts.Algorithm != "" {
                workerPool.AutoscalingSettings.Algorithm = map[string]string{
                        "NONE":             "AUTOSCALING_ALGORITHM_NONE",

Reply via email to