This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 2039b39f96f [BEAM-14509] Add several flags to dataflow runner (#17752) 2039b39f96f is described below commit 2039b39f96ff96e0899a830e560c02c6c8e17acb Author: Danny McCormick <dannymccorm...@google.com> AuthorDate: Wed May 25 15:23:08 2022 -0400 [BEAM-14509] Add several flags to dataflow runner (#17752) --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 112 +++++++++++++-------- sdks/go/pkg/beam/runners/dataflow/dataflow_test.go | 39 ++++++- .../pkg/beam/runners/dataflow/dataflowlib/job.go | 21 +++- 3 files changed, 123 insertions(+), 49 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 33b4b0fc66d..eff46bca3f3 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -51,26 +51,32 @@ import ( // TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs. var ( - endpoint = flag.String("dataflow_endpoint", "", "Dataflow endpoint (optional).") - stagingLocation = flag.String("staging_location", "", "GCS staging location (required).") - image = flag.String("worker_harness_container_image", "", "Worker harness container image (required).") - labels = flag.String("labels", "", "JSON-formatted map[string]string of job labels (optional).") - serviceAccountEmail = flag.String("service_account_email", "", "Service account email (optional).") - numWorkers = flag.Int64("num_workers", 0, "Number of workers (optional).") - maxNumWorkers = flag.Int64("max_num_workers", 0, "Maximum number of workers during scaling (optional).") - diskSizeGb = flag.Int64("disk_size_gb", 0, "Size of root disk for VMs, in GB (optional).") - diskType = flag.String("disk_type", "", "Type of root disk for VMs (optional).") - autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", "Autoscaling mode to use (optional).") - zone = flag.String("zone", "", "GCP zone (optional)") - network = flag.String("network", "", "GCP network (optional)") - subnetwork = flag.String("subnetwork", "", "GCP subnetwork (optional)") - noUsePublicIPs = flag.Bool("no_use_public_ips", false, "Workers must not use public IP addresses (optional)") - tempLocation = flag.String("temp_location", "", "Temp location (optional)") - machineType = flag.String("worker_machine_type", "", "GCE machine type (optional)") - minCPUPlatform = flag.String("min_cpu_platform", "", "GCE minimum cpu platform (optional)") - workerJar = flag.String("dataflow_worker_jar", "", "Dataflow worker jar (optional)") - workerRegion = flag.String("worker_region", "", "Dataflow worker region (optional)") - workerZone = flag.String("worker_zone", "", "Dataflow worker zone (optional)") + endpoint = flag.String("dataflow_endpoint", "", "Dataflow endpoint (optional).") + stagingLocation = flag.String("staging_location", "", "GCS staging location (required).") + image = flag.String("worker_harness_container_image", "", "Worker harness container image (optional).") + labels = flag.String("labels", "", "JSON-formatted map[string]string of job labels (optional).") + serviceAccountEmail = flag.String("service_account_email", "", "Service account email (optional).") + numWorkers = flag.Int64("num_workers", 0, "Number of workers (optional).") + workerHarnessThreads = flag.Int64("number_of_worker_harness_threads", 0, "The number of threads per each worker harness process (optional).") + maxNumWorkers = flag.Int64("max_num_workers", 0, "Maximum number of workers during scaling (optional).") + diskSizeGb = flag.Int64("disk_size_gb", 0, "Size of root disk for VMs, in GB (optional).") + diskType = flag.String("disk_type", "", "Type of root disk for VMs (optional).") + autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", "Autoscaling mode to use (optional).") + zone = flag.String("zone", "", "GCP zone (optional)") + kmsKey = flag.String("dataflow_kms_key", "", "The Cloud KMS key identifier used to encrypt data at rest (optional).") + network = flag.String("network", "", "GCP network (optional)") + subnetwork = flag.String("subnetwork", "", "GCP subnetwork (optional)") + noUsePublicIPs = flag.Bool("no_use_public_ips", false, "Workers must not use public IP addresses (optional)") + tempLocation = flag.String("temp_location", "", "Temp location (optional)") + machineType = flag.String("worker_machine_type", "", "GCE machine type (optional)") + minCPUPlatform = flag.String("min_cpu_platform", "", "GCE minimum cpu platform (optional)") + workerJar = flag.String("dataflow_worker_jar", "", "Dataflow worker jar (optional)") + workerRegion = flag.String("worker_region", "", "Dataflow worker region (optional)") + workerZone = flag.String("worker_zone", "", "Dataflow worker zone (optional)") + dataflowServiceOptions = flag.String("dataflow_service_options", "", "Comma separated list of additional job modes and configurations (optional)") + flexRSGoal = flag.String("flexrs_goal", "", "Which Flexible Resource Scheduling mode to run in (optional)") + // TODO(BEAM-14512) Turn this on once TO_STRING is implemented + // enableHotKeyLogging = flag.Bool("enable_hot_key_logging", false, "Specifies that when a hot key is detected in the pipeline, the literal, human-readable key is printed in the user's Cloud Logging project (optional).") dryRun = flag.Bool("dry_run", false, "Dry run. Just print the job, but don't submit it.") teardownPolicy = flag.String("teardown_policy", "", "Job teardown policy (internal only).") @@ -242,6 +248,15 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) { } } + if *flexRSGoal != "" { + switch *flexRSGoal { + case "FLEXRS_UNSPECIFIED", "FLEXRS_SPEED_OPTIMIZED", "FLEXRS_COST_OPTIMIZED": + // valid values + default: + return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal) + } + } + hooks.SerializeHooksToOptions() experiments := jobopts.GetExperiments() @@ -267,32 +282,41 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) { experiments = append(experiments, fmt.Sprintf("min_cpu_platform=%v", *minCPUPlatform)) } + var dfServiceOptions []string + if *dataflowServiceOptions != "" { + dfServiceOptions = strings.Split(*dataflowServiceOptions, ",") + } + beam.PipelineOptions.LoadOptionsFromFlags(flagFilter) opts := &dataflowlib.JobOptions{ - Name: jobopts.GetJobName(), - Experiments: experiments, - Options: beam.PipelineOptions.Export(), - Project: project, - Region: region, - Zone: *zone, - Network: *network, - Subnetwork: *subnetwork, - NoUsePublicIPs: *noUsePublicIPs, - NumWorkers: *numWorkers, - MaxNumWorkers: *maxNumWorkers, - DiskSizeGb: *diskSizeGb, - DiskType: *diskType, - Algorithm: *autoscalingAlgorithm, - MachineType: *machineType, - Labels: jobLabels, - ServiceAccountEmail: *serviceAccountEmail, - TempLocation: *tempLocation, - Worker: *jobopts.WorkerBinary, - WorkerJar: *workerJar, - WorkerRegion: *workerRegion, - WorkerZone: *workerZone, - TeardownPolicy: *teardownPolicy, - ContainerImage: getContainerImage(ctx), + Name: jobopts.GetJobName(), + Experiments: experiments, + DataflowServiceOptions: dfServiceOptions, + Options: beam.PipelineOptions.Export(), + Project: project, + Region: region, + Zone: *zone, + KmsKey: *kmsKey, + Network: *network, + Subnetwork: *subnetwork, + NoUsePublicIPs: *noUsePublicIPs, + NumWorkers: *numWorkers, + MaxNumWorkers: *maxNumWorkers, + WorkerHarnessThreads: *workerHarnessThreads, + DiskSizeGb: *diskSizeGb, + DiskType: *diskType, + Algorithm: *autoscalingAlgorithm, + FlexRSGoal: *flexRSGoal, + MachineType: *machineType, + Labels: jobLabels, + ServiceAccountEmail: *serviceAccountEmail, + TempLocation: *tempLocation, + Worker: *jobopts.WorkerBinary, + WorkerJar: *workerJar, + WorkerRegion: *workerRegion, + WorkerZone: *workerZone, + TeardownPolicy: *teardownPolicy, + ContainerImage: getContainerImage(ctx), } if opts.TempLocation == "" { opts.TempLocation = gcsx.Join(*stagingLocation, "tmp") diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go index 29118de855e..f536fc917ca 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go @@ -17,10 +17,11 @@ package dataflow import ( "context" - "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts" - "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts" "sort" "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts" + "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts" ) func TestDontUseFlagAsPipelineOption(t *testing.T) { @@ -39,6 +40,8 @@ func TestGetJobOptions(t *testing.T) { *stagingLocation = "gs://testStagingLocation" *autoscalingAlgorithm = "NONE" *minCPUPlatform = "testPlatform" + *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED" + *dataflowServiceOptions = "opt1,opt2" *gcpopts.Project = "testProject" *gcpopts.Region = "testRegion" @@ -64,6 +67,17 @@ func TestGetJobOptions(t *testing.T) { } } } + if got, want := len(opts.DataflowServiceOptions), 2; got != want { + t.Errorf("len(getJobOptions().DataflowServiceOptions) = %q, want %q", got, want) + } else { + sort.Strings(opts.DataflowServiceOptions) + expectedOptions := []string{"opt1", "opt2"} + for i := 0; i < 2; i++ { + if got, want := opts.DataflowServiceOptions[i], expectedOptions[i]; got != want { + t.Errorf("getJobOptions().DataflowServiceOptions = %q, want %q", got, want) + } + } + } if got, want := opts.Project, "testProject"; got != want { t.Errorf("getJobOptions().Project = %q, want %q", got, want) } @@ -83,6 +97,9 @@ func TestGetJobOptions(t *testing.T) { if got, want := opts.TempLocation, "gs://testStagingLocation/tmp"; got != want { t.Errorf("getJobOptions().TempLocation = %q, want %q", got, want) } + if got, want := opts.FlexRSGoal, "FLEXRS_SPEED_OPTIMIZED"; got != want { + t.Errorf("getJobOptions().FlexRSGoal = %q, want %q", got, want) + } } func TestGetJobOptions_NoExperimentsSet(t *testing.T) { @@ -143,6 +160,24 @@ func TestGetJobOptions_InvalidAutoscaling(t *testing.T) { } } +func TestGetJobOptions_InvalidRsGoal(t *testing.T) { + *labels = `{"label1": "val1", "label2": "val2"}` + *stagingLocation = "gs://testStagingLocation" + *flexRSGoal = "INVALID" + *minCPUPlatform = "testPlatform" + + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + + *jobopts.Experiments = "use_runner_v2,use_portable_job_submission" + *jobopts.JobName = "testJob" + + _, err := getJobOptions(context.Background()) + if err == nil { + t.Fatalf("getJobOptions() returned error nil, want an error") + } +} + func TestGetJobOptions_DockerWithImage(t *testing.T) { *jobopts.EnvironmentType = "docker" *jobopts.EnvironmentConfig = "testContainerImage" diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go index 459c3f52a4b..fd24729ff9e 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go @@ -41,12 +41,15 @@ type JobOptions struct { Name string // Experiments are additional experiments. Experiments []string + // DataflowServiceOptions are additional job modes and configurations for Dataflow + DataflowServiceOptions []string // Pipeline options Options runtime.RawOptions Project string Region string Zone string + KmsKey string Network string Subnetwork string NoUsePublicIPs bool @@ -60,10 +63,13 @@ type JobOptions struct { WorkerZone string ContainerImage string ArtifactURLs []string // Additional packages for workers. + FlexRSGoal string + EnableHotKeyLogging bool // Autoscaling settings - Algorithm string - MaxNumWorkers int64 + Algorithm string + MaxNumWorkers int64 + WorkerHarnessThreads int64 TempLocation string @@ -155,7 +161,11 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker Name: opts.Name, Type: jobType, Environment: &df.Environment{ - ServiceAccountEmail: opts.ServiceAccountEmail, + DebugOptions: &df.DebugOptions{ + EnableHotKeyLogging: opts.EnableHotKeyLogging, + }, + FlexResourceSchedulingGoal: opts.FlexRSGoal, + ServiceAccountEmail: opts.ServiceAccountEmail, UserAgent: newMsg(userAgent{ Name: core.SdkName, Version: core.SdkVersion, @@ -174,6 +184,8 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker }, GoOptions: opts.Options, }), + ServiceOptions: opts.DataflowServiceOptions, + ServiceKmsKeyName: opts.KmsKey, WorkerPools: []*df.WorkerPool{{ AutoscalingSettings: &df.AutoscalingSettings{ MaxNumWorkers: opts.MaxNumWorkers, @@ -205,6 +217,9 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker if opts.NumWorkers > 0 { workerPool.NumWorkers = opts.NumWorkers } + if opts.WorkerHarnessThreads > 0 { + workerPool.NumThreadsPerWorker = opts.WorkerHarnessThreads + } if opts.Algorithm != "" { workerPool.AutoscalingSettings.Algorithm = map[string]string{ "NONE": "AUTOSCALING_ALGORITHM_NONE",