[ 
https://issues.apache.org/jira/browse/BEAM-5327?focusedWorklogId=141909&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141909
 ]

ASF GitHub Bot logged work on BEAM-5327:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Sep/18 19:14
            Start Date: 06/Sep/18 19:14
    Worklog Time Spent: 10m 
      Work Description: herohde closed pull request #6336: [BEAM-5327] Add 
support for custom dataflow worker jar in Go
URL: https://github.com/apache/beam/pull/6336
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index db722ba08fe..af3cce04415 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -55,6 +55,7 @@ var (
        tempLocation    = flag.String("temp_location", "", "Temp location 
(optional)")
        machineType     = flag.String("worker_machine_type", "", "GCE machine 
type (optional)")
        minCPUPlatform  = flag.String("min_cpu_platform", "", "GCE minimum cpu 
platform (optional)")
+       workerJar       = flag.String("dataflow_worker_jar", "", "Dataflow 
worker jar (optional)")
 
        dryRun         = flag.Bool("dry_run", false, "Dry run. Just print the 
job, but don't submit it.")
        teardownPolicy = flag.String("teardown_policy", "", "Job teardown 
policy (internal only).")
@@ -129,6 +130,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                Labels:         jobLabels,
                TempLocation:   *tempLocation,
                Worker:         *jobopts.WorkerBinary,
+               WorkerJar:      *workerJar,
                TeardownPolicy: *teardownPolicy,
        }
        if opts.TempLocation == "" {
@@ -149,12 +151,13 @@ func Execute(ctx context.Context, p *beam.Pipeline) error 
{
        id := atomic.AddInt32(&unique, 1)
        modelURL := gcsx.Join(*stagingLocation, fmt.Sprintf("model-%v-%v", id, 
time.Now().UnixNano()))
        workerURL := gcsx.Join(*stagingLocation, fmt.Sprintf("worker-%v-%v", 
id, time.Now().UnixNano()))
+       jarURL := gcsx.Join(*stagingLocation, 
fmt.Sprintf("dataflow-worker-%v-%v.jar", id, time.Now().UnixNano()))
 
        if *dryRun {
                log.Info(ctx, "Dry-run: not submitting job!")
 
                log.Info(ctx, proto.MarshalTextString(model))
-               job, err := dataflowlib.Translate(model, opts, workerURL, 
modelURL)
+               job, err := dataflowlib.Translate(model, opts, workerURL, 
jarURL, modelURL)
                if err != nil {
                        return err
                }
@@ -162,7 +165,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                return nil
        }
 
-       _, err = dataflowlib.Execute(ctx, model, opts, workerURL, modelURL, 
*endpoint, false)
+       _, err = dataflowlib.Execute(ctx, model, opts, workerURL, jarURL, 
modelURL, *endpoint, false)
        return err
 }
 
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
index 15e36d01b1f..eb7ec794d22 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
@@ -31,7 +31,7 @@ import (
 )
 
 // Execute submits a pipeline as a Dataflow job.
-func Execute(ctx context.Context, raw *pb.Pipeline, opts *JobOptions, 
workerURL, modelURL, endpoint string, async bool) (string, error) {
+func Execute(ctx context.Context, raw *pb.Pipeline, opts *JobOptions, 
workerURL, jarURL, modelURL, endpoint string, async bool) (string, error) {
        // (1) Upload Go binary to GCS.
 
        bin := opts.Worker
@@ -56,11 +56,20 @@ func Execute(ctx context.Context, raw *pb.Pipeline, opts 
*JobOptions, workerURL,
 
        log.Infof(ctx, "Staging worker binary: %v", bin)
 
-       if err := StageWorker(ctx, opts.Project, workerURL, bin); err != nil {
+       if err := StageFile(ctx, opts.Project, workerURL, bin); err != nil {
                return "", err
        }
        log.Infof(ctx, "Staged worker binary: %v", workerURL)
 
+       if opts.WorkerJar != "" {
+               log.Infof(ctx, "Staging Dataflow worker jar: %v", 
opts.WorkerJar)
+
+               if err := StageFile(ctx, opts.Project, jarURL, opts.WorkerJar); 
err != nil {
+                       return "", err
+               }
+               log.Infof(ctx, "Staged worker jar: %v", jarURL)
+       }
+
        // (2) Fixup and upload model to GCS
 
        p, err := Fixup(raw)
@@ -76,7 +85,7 @@ func Execute(ctx context.Context, raw *pb.Pipeline, opts 
*JobOptions, workerURL,
 
        // (3) Translate to v1b3 and submit
 
-       job, err := Translate(p, opts, workerURL, modelURL)
+       job, err := Translate(p, opts, workerURL, jarURL, modelURL)
        if err != nil {
                return "", err
        }
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 9a1c1853d95..9bf7e2e395d 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -53,6 +53,8 @@ type JobOptions struct {
 
        // Worker is the worker binary override.
        Worker string
+       // WorkerJar is a custom worker jar.
+       WorkerJar string
 
        // -- Internal use only. Not supported in public Dataflow. --
 
@@ -60,7 +62,7 @@ type JobOptions struct {
 }
 
 // Translate translates a pipeline to a Dataflow job.
-func Translate(p *pb.Pipeline, opts *JobOptions, workerURL, modelURL string) 
(*df.Job, error) {
+func Translate(p *pb.Pipeline, opts *JobOptions, workerURL, jarURL, modelURL 
string) (*df.Job, error) {
        // (1) Translate pipeline to v1b3 speak.
 
        steps, err := translate(p)
@@ -82,6 +84,21 @@ func Translate(p *pb.Pipeline, opts *JobOptions, workerURL, 
modelURL string) (*d
                return nil, fmt.Errorf("Dataflow supports one container image 
only: %v", images)
        }
 
+       packages := []*df.Package{{
+               Name:     "worker",
+               Location: workerURL,
+       }}
+       experiments := append(opts.Experiments, "beam_fn_api")
+
+       if opts.WorkerJar != "" {
+               jar := &df.Package{
+                       Name:     "dataflow-worker.jar",
+                       Location: jarURL,
+               }
+               packages = append(packages, jar)
+               experiments = append(experiments, 
"use_staged_dataflow_worker_jar")
+       }
+
        job := &df.Job{
                ProjectId: opts.Project,
                Name:      opts.Name,
@@ -100,15 +117,13 @@ func Translate(p *pb.Pipeline, opts *JobOptions, 
workerURL, modelURL string) (*d
                                Options: dataflowOptions{
                                        PipelineURL: modelURL,
                                        Region:      opts.Region,
+                                       Experiments: experiments,
                                },
                                GoOptions: opts.Options,
                        }),
                        WorkerPools: []*df.WorkerPool{{
-                               Kind: "harness",
-                               Packages: []*df.Package{{
-                                       Location: workerURL,
-                                       Name:     "worker",
-                               }},
+                               Kind:                        "harness",
+                               Packages:                    packages,
                                WorkerHarnessContainerImage: images[0],
                                NumWorkers:                  1,
                                MachineType:                 opts.MachineType,
@@ -116,7 +131,7 @@ func Translate(p *pb.Pipeline, opts *JobOptions, workerURL, 
modelURL string) (*d
                                Zone:                        opts.Zone,
                        }},
                        TempStoragePrefix: opts.TempLocation,
-                       Experiments:       append(opts.Experiments, 
"beam_fn_api"),
+                       Experiments:       experiments,
                },
                Labels: opts.Labels,
                Steps:  steps,
@@ -132,6 +147,7 @@ func Translate(p *pb.Pipeline, opts *JobOptions, workerURL, 
modelURL string) (*d
                // Add separate data disk for streaming jobs
                job.Environment.WorkerPools[0].DataDisks = []*df.Disk{{}}
        }
+
        return job, nil
 }
 
@@ -191,8 +207,9 @@ func NewClient(ctx context.Context, endpoint string) 
(*df.Service, error) {
 }
 
 type dataflowOptions struct {
-       PipelineURL string `json:"pipelineUrl"`
-       Region      string `json:"region"`
+       Experiments []string `json:"experiments,omitempty"`
+       PipelineURL string   `json:"pipelineUrl"`
+       Region      string   `json:"region"`
 }
 
 func printOptions(opts *JobOptions, images []string) []*displayData {
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
index 46f0aed6a3e..f467fea6570 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
@@ -31,15 +31,15 @@ func StageModel(ctx context.Context, project, modelURL 
string, model []byte) err
        return upload(ctx, project, modelURL, bytes.NewReader(model))
 }
 
-// StageWorker uploads the worker binary to GCS as a unique object.
-func StageWorker(ctx context.Context, project, workerURL, worker string) error 
{
-       fd, err := os.Open(worker)
+// StageFile uploads a file to GCS.
+func StageFile(ctx context.Context, project, url, filename string) error {
+       fd, err := os.Open(filename)
        if err != nil {
-               return fmt.Errorf("failed to open worker binary %s: %v", 
worker, err)
+               return fmt.Errorf("failed to open file %s: %v", filename, err)
        }
        defer fd.Close()
 
-       return upload(ctx, project, workerURL, fd)
+       return upload(ctx, project, url, fd)
 }
 
 func upload(ctx context.Context, project, object string, r io.Reader) error {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 141909)
    Time Spent: 40m  (was: 0.5h)

> Go support for custom dataflow worker jar
> -----------------------------------------
>
>                 Key: BEAM-5327
>                 URL: https://issues.apache.org/jira/browse/BEAM-5327
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Henning Rohde
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to