This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 92b8dc75286 [BEAM-14505] Add Dataflow streaming pipeline update support to the Go SDK (#17747) 92b8dc75286 is described below commit 92b8dc75286268f85da59d436ef50e5913dbe9e5 Author: Jack McCluskey <34928439+jrmcclus...@users.noreply.github.com> AuthorDate: Thu May 26 19:14:47 2022 -0400 [BEAM-14505] Add Dataflow streaming pipeline update support to the Go SDK (#17747) --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 17 +++++ sdks/go/pkg/beam/runners/dataflow/dataflow_test.go | 78 ++++++++++++++++++++++ .../beam/runners/dataflow/dataflowlib/execute.go | 2 +- .../pkg/beam/runners/dataflow/dataflowlib/job.go | 18 ++++- 4 files changed, 111 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index eff46bca3f3..82d869ac5d7 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -78,6 +78,10 @@ var ( // TODO(BEAM-14512) Turn this on once TO_STRING is implemented // enableHotKeyLogging = flag.Bool("enable_hot_key_logging", false, "Specifies that when a hot key is detected in the pipeline, the literal, human-readable key is printed in the user's Cloud Logging project (optional).") + // Streaming update flags + update = flag.Bool("update", false, "Submit this job as an update to an existing Dataflow job (optional); the job name must match the existing job to update") + transformMapping = flag.String("transform_name_mapping", "", "JSON-formatted mapping of old transform names to new transform names for pipeline updates (optional)") + dryRun = flag.Bool("dry_run", false, "Dry run. Just print the job, but don't submit it.") teardownPolicy = flag.String("teardown_policy", "", "Job teardown policy (internal only).") @@ -119,6 +123,8 @@ var flagFilter = map[string]bool{ "teardown_policy": true, "cpu_profiling": true, "session_recording": true, + "update": true, + "transform_name_mapping": true, // Job Options flags "endpoint": true, @@ -256,6 +262,15 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) { return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal) } } + if !*update && *transformMapping != "" { + return nil, errors.New("provided transform_name_mapping without setting the --update flag, so the pipeline would not be updated") + } + var updateTransformMapping map[string]string + if *transformMapping != "" { + if err := json.Unmarshal([]byte(*transformMapping), &updateTransformMapping); err != nil { + return nil, errors.Wrapf(err, "error reading --transform_name_mapping flag as JSON") + } + } hooks.SerializeHooksToOptions() @@ -317,6 +332,8 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) { WorkerZone: *workerZone, TeardownPolicy: *teardownPolicy, ContainerImage: getContainerImage(ctx), + Update: *update, + TransformNameMapping: updateTransformMapping, } if opts.TempLocation == "" { opts.TempLocation = gcsx.Join(*stagingLocation, "tmp") diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go index f536fc917ca..620426587e8 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go @@ -197,3 +197,81 @@ func TestGetJobOptions_DockerNoImage(t *testing.T) { t.Fatalf("getContainerImage() = %q, want %q", got, want) } } + +func TestGetJobOptions_TransformMapping(t *testing.T) { + *labels = `{"label1": "val1", "label2": "val2"}` + *stagingLocation = "gs://testStagingLocation" + *autoscalingAlgorithm = "NONE" + *minCPUPlatform = "testPlatform" + *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED" + + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + + *jobopts.Experiments = "use_runner_v2,use_portable_job_submission" + *jobopts.JobName = "testJob" + + *update = true + *transformMapping = `{"transformOne": "transformTwo"}` + opts, err := getJobOptions(context.Background()) + if err != nil { + t.Errorf("getJobOptions() returned error, got %v", err) + } + if opts == nil { + t.Fatal("getJobOptions() got nil, want struct") + } + if got, ok := opts.TransformNameMapping["transformOne"]; !ok || got != "transformTwo" { + t.Errorf("mismatch in transform mapping got %v, want %v", got, "transformTwo") + } + +} + +func TestGetJobOptions_TransformMappingNoUpdate(t *testing.T) { + *labels = `{"label1": "val1", "label2": "val2"}` + *stagingLocation = "gs://testStagingLocation" + *autoscalingAlgorithm = "NONE" + *minCPUPlatform = "testPlatform" + *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED" + + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + + *jobopts.Experiments = "use_runner_v2,use_portable_job_submission" + *jobopts.JobName = "testJob" + + *update = false + *transformMapping = `{"transformOne": "transformTwo"}` + + opts, err := getJobOptions(context.Background()) + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + +func TestGetJobOptions_InvalidMapping(t *testing.T) { + *labels = `{"label1": "val1", "label2": "val2"}` + *stagingLocation = "gs://testStagingLocation" + *autoscalingAlgorithm = "NONE" + *minCPUPlatform = "testPlatform" + *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED" + + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + + *jobopts.Experiments = "use_runner_v2,use_portable_job_submission" + *jobopts.JobName = "testJob" + + *update = true + *transformMapping = "not a JSON-encoded string" + + opts, err := getJobOptions(context.Background()) + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go index abc4db75145..ba81147a183 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go @@ -109,7 +109,7 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker if err != nil { return presult, err } - upd, err := Submit(ctx, client, opts.Project, opts.Region, job) + upd, err := Submit(ctx, client, opts.Project, opts.Region, job, opts.Update) // When in async mode, if we get a 409 because we've already submitted an actively running job with the same name // just return the existing job as a convenience if gErr, ok := err.(*googleapi.Error); async && ok && gErr.Code == 409 { diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go index fd24729ff9e..e1fa51f7425 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go @@ -66,6 +66,10 @@ type JobOptions struct { FlexRSGoal string EnableHotKeyLogging bool + // Streaming update settings + Update bool + TransformNameMapping map[string]string + // Autoscaling settings Algorithm string MaxNumWorkers int64 @@ -208,8 +212,9 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker TempStoragePrefix: opts.TempLocation, Experiments: experiments, }, - Labels: opts.Labels, - Steps: steps, + Labels: opts.Labels, + TransformNameMapping: opts.TransformNameMapping, + Steps: steps, } workerPool := job.Environment.WorkerPools[0] @@ -238,7 +243,14 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker } // Submit submits a prepared job to Cloud Dataflow. -func Submit(ctx context.Context, client *df.Service, project, region string, job *df.Job) (*df.Job, error) { +func Submit(ctx context.Context, client *df.Service, project, region string, job *df.Job, updateJob bool) (*df.Job, error) { + if updateJob { + runningJob, err := GetRunningJobByName(client, project, region, job.Name) + if err != nil { + return nil, err + } + job.ReplaceJobId = runningJob.Id + } upd, err := client.Projects.Locations.Jobs.Create(project, region, job).Do() if err == nil { log.Infof(ctx, "Submitted job: %v", upd.Id)