This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 92b8dc75286 [BEAM-14505] Add Dataflow streaming pipeline update 
support to the Go SDK (#17747)
92b8dc75286 is described below

commit 92b8dc75286268f85da59d436ef50e5913dbe9e5
Author: Jack McCluskey <34928439+jrmcclus...@users.noreply.github.com>
AuthorDate: Thu May 26 19:14:47 2022 -0400

    [BEAM-14505] Add Dataflow streaming pipeline update support to the Go SDK 
(#17747)
---
 sdks/go/pkg/beam/runners/dataflow/dataflow.go      | 17 +++++
 sdks/go/pkg/beam/runners/dataflow/dataflow_test.go | 78 ++++++++++++++++++++++
 .../beam/runners/dataflow/dataflowlib/execute.go   |  2 +-
 .../pkg/beam/runners/dataflow/dataflowlib/job.go   | 18 ++++-
 4 files changed, 111 insertions(+), 4 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index eff46bca3f3..82d869ac5d7 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -78,6 +78,10 @@ var (
        // TODO(BEAM-14512) Turn this on once TO_STRING is implemented
        // enableHotKeyLogging    = flag.Bool("enable_hot_key_logging", false, 
"Specifies that when a hot key is detected in the pipeline, the literal, 
human-readable key is printed in the user's Cloud Logging project (optional).")
 
+       // Streaming update flags
+       update           = flag.Bool("update", false, "Submit this job as an 
update to an existing Dataflow job (optional); the job name must match the 
existing job to update")
+       transformMapping = flag.String("transform_name_mapping", "", 
"JSON-formatted mapping of old transform names to new transform names for 
pipeline updates (optional)")
+
        dryRun         = flag.Bool("dry_run", false, "Dry run. Just print the 
job, but don't submit it.")
        teardownPolicy = flag.String("teardown_policy", "", "Job teardown 
policy (internal only).")
 
@@ -119,6 +123,8 @@ var flagFilter = map[string]bool{
        "teardown_policy":                true,
        "cpu_profiling":                  true,
        "session_recording":              true,
+       "update":                         true,
+       "transform_name_mapping":         true,
 
        // Job Options flags
        "endpoint":                 true,
@@ -256,6 +262,15 @@ func getJobOptions(ctx context.Context) 
(*dataflowlib.JobOptions, error) {
                        return nil, errors.Errorf("invalid flex resource 
scheduling goal. Got %q; Use 
--flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)",
 *flexRSGoal)
                }
        }
+       if !*update && *transformMapping != "" {
+               return nil, errors.New("provided transform_name_mapping without 
setting the --update flag, so the pipeline would not be updated")
+       }
+       var updateTransformMapping map[string]string
+       if *transformMapping != "" {
+               if err := json.Unmarshal([]byte(*transformMapping), 
&updateTransformMapping); err != nil {
+                       return nil, errors.Wrapf(err, "error reading 
--transform_name_mapping flag as JSON")
+               }
+       }
 
        hooks.SerializeHooksToOptions()
 
@@ -317,6 +332,8 @@ func getJobOptions(ctx context.Context) 
(*dataflowlib.JobOptions, error) {
                WorkerZone:             *workerZone,
                TeardownPolicy:         *teardownPolicy,
                ContainerImage:         getContainerImage(ctx),
+               Update:                 *update,
+               TransformNameMapping:   updateTransformMapping,
        }
        if opts.TempLocation == "" {
                opts.TempLocation = gcsx.Join(*stagingLocation, "tmp")
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
index f536fc917ca..620426587e8 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
@@ -197,3 +197,81 @@ func TestGetJobOptions_DockerNoImage(t *testing.T) {
                t.Fatalf("getContainerImage() = %q, want %q", got, want)
        }
 }
+
+func TestGetJobOptions_TransformMapping(t *testing.T) {
+       *labels = `{"label1": "val1", "label2": "val2"}`
+       *stagingLocation = "gs://testStagingLocation"
+       *autoscalingAlgorithm = "NONE"
+       *minCPUPlatform = "testPlatform"
+       *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+
+       *gcpopts.Project = "testProject"
+       *gcpopts.Region = "testRegion"
+
+       *jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+       *jobopts.JobName = "testJob"
+
+       *update = true
+       *transformMapping = `{"transformOne": "transformTwo"}`
+       opts, err := getJobOptions(context.Background())
+       if err != nil {
+               t.Errorf("getJobOptions() returned error, got %v", err)
+       }
+       if opts == nil {
+               t.Fatal("getJobOptions() got nil, want struct")
+       }
+       if got, ok := opts.TransformNameMapping["transformOne"]; !ok || got != 
"transformTwo" {
+               t.Errorf("mismatch in transform mapping got %v, want %v", got, 
"transformTwo")
+       }
+
+}
+
+func TestGetJobOptions_TransformMappingNoUpdate(t *testing.T) {
+       *labels = `{"label1": "val1", "label2": "val2"}`
+       *stagingLocation = "gs://testStagingLocation"
+       *autoscalingAlgorithm = "NONE"
+       *minCPUPlatform = "testPlatform"
+       *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+
+       *gcpopts.Project = "testProject"
+       *gcpopts.Region = "testRegion"
+
+       *jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+       *jobopts.JobName = "testJob"
+
+       *update = false
+       *transformMapping = `{"transformOne": "transformTwo"}`
+
+       opts, err := getJobOptions(context.Background())
+       if err == nil {
+               t.Error("getJobOptions() returned error nil, want an error")
+       }
+       if opts != nil {
+               t.Errorf("getJobOptions() returned JobOptions when it should 
not have, got %#v, want nil", opts)
+       }
+}
+
+func TestGetJobOptions_InvalidMapping(t *testing.T) {
+       *labels = `{"label1": "val1", "label2": "val2"}`
+       *stagingLocation = "gs://testStagingLocation"
+       *autoscalingAlgorithm = "NONE"
+       *minCPUPlatform = "testPlatform"
+       *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+
+       *gcpopts.Project = "testProject"
+       *gcpopts.Region = "testRegion"
+
+       *jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+       *jobopts.JobName = "testJob"
+
+       *update = true
+       *transformMapping = "not a JSON-encoded string"
+
+       opts, err := getJobOptions(context.Background())
+       if err == nil {
+               t.Error("getJobOptions() returned error nil, want an error")
+       }
+       if opts != nil {
+               t.Errorf("getJobOptions() returned JobOptions when it should 
not have, got %#v, want nil", opts)
+       }
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
index abc4db75145..ba81147a183 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
@@ -109,7 +109,7 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, 
opts *JobOptions, worker
        if err != nil {
                return presult, err
        }
-       upd, err := Submit(ctx, client, opts.Project, opts.Region, job)
+       upd, err := Submit(ctx, client, opts.Project, opts.Region, job, 
opts.Update)
        // When in async mode, if we get a 409 because we've already submitted 
an actively running job with the same name
        // just return the existing job as a convenience
        if gErr, ok := err.(*googleapi.Error); async && ok && gErr.Code == 409 {
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index fd24729ff9e..e1fa51f7425 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -66,6 +66,10 @@ type JobOptions struct {
        FlexRSGoal          string
        EnableHotKeyLogging bool
 
+       // Streaming update settings
+       Update               bool
+       TransformNameMapping map[string]string
+
        // Autoscaling settings
        Algorithm            string
        MaxNumWorkers        int64
@@ -208,8 +212,9 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, 
opts *JobOptions, worker
                        TempStoragePrefix: opts.TempLocation,
                        Experiments:       experiments,
                },
-               Labels: opts.Labels,
-               Steps:  steps,
+               Labels:               opts.Labels,
+               TransformNameMapping: opts.TransformNameMapping,
+               Steps:                steps,
        }
 
        workerPool := job.Environment.WorkerPools[0]
@@ -238,7 +243,14 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, 
opts *JobOptions, worker
 }
 
 // Submit submits a prepared job to Cloud Dataflow.
-func Submit(ctx context.Context, client *df.Service, project, region string, 
job *df.Job) (*df.Job, error) {
+func Submit(ctx context.Context, client *df.Service, project, region string, 
job *df.Job, updateJob bool) (*df.Job, error) {
+       if updateJob {
+               runningJob, err := GetRunningJobByName(client, project, region, 
job.Name)
+               if err != nil {
+                       return nil, err
+               }
+               job.ReplaceJobId = runningJob.Id
+       }
        upd, err := client.Projects.Locations.Jobs.Create(project, region, 
job).Do()
        if err == nil {
                log.Infof(ctx, "Submitted job: %v", upd.Id)

Reply via email to