This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 838c030  [BEAM-7383] Adding strict flag to runners to validate with 
vet runner
     new e0c7eb6  Merge pull request #8644 from youngoli/beam7383
838c030 is described below

commit 838c0304a0d6e95eb8ab1be49e864db6fa3c561f
Author: Daniel Oliveira <[email protected]>
AuthorDate: Tue May 21 16:55:55 2019 -0700

    [BEAM-7383] Adding strict flag to runners to validate with vet runner
    
    Creates a pipeline option called beam_strict that's supported by the
    direct and universal runner. Uses the vet runner to perform the
    verification for strict mode.
---
 sdks/go/pkg/beam/options/jobopts/options.go     |  4 ++++
 sdks/go/pkg/beam/runners/direct/direct.go       | 10 ++++++++++
 sdks/go/pkg/beam/runners/universal/universal.go | 13 +++++++++++++
 sdks/go/pkg/beam/runners/vet/vet.go             | 10 ++++++----
 4 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/sdks/go/pkg/beam/options/jobopts/options.go 
b/sdks/go/pkg/beam/options/jobopts/options.go
index 86beb94..fb6b64a 100644
--- a/sdks/go/pkg/beam/options/jobopts/options.go
+++ b/sdks/go/pkg/beam/options/jobopts/options.go
@@ -61,6 +61,10 @@ var (
 
        // Async determines whether to wait for job completion.
        Async = flag.Bool("async", false, "Do not wait for job completion.")
+
+       // Strict mode applies additional validation to user pipelines before
+       // executing them and fails early if the pipelines don't pass.
+       Strict = flag.Bool("beam_strict", false, "Apply additional validation 
to pipelines.")
 )
 
 // GetEndpoint returns the endpoint, if non empty and exits otherwise. Runners
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go 
b/sdks/go/pkg/beam/runners/direct/direct.go
index bd1d324..55e4836 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -28,6 +28,8 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
+       "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
+       "github.com/apache/beam/sdks/go/pkg/beam/runners/vet"
 )
 
 func init() {
@@ -45,6 +47,14 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        log.Info(ctx, "Pipeline:")
        log.Info(ctx, p)
 
+       if *jobopts.Strict {
+               log.Info(ctx, "Strict mode enabled, applying additional 
validation.")
+               if err := vet.Execute(ctx, p); err != nil {
+                       return errors.Wrap(err, "strictness check failed")
+               }
+               log.Info(ctx, "Strict mode validation passed.")
+       }
+
        edges, _, err := p.Build()
        if err != nil {
                return errors.Wrap(err, "invalid pipeline")
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go 
b/sdks/go/pkg/beam/runners/universal/universal.go
index 0e22db7..51f74da 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -30,6 +30,7 @@ import (
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
        "github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib"
+       "github.com/apache/beam/sdks/go/pkg/beam/runners/vet"
        "github.com/golang/protobuf/proto"
 )
 
@@ -40,6 +41,18 @@ func init() {
 
 // Execute executes the pipeline on a universal beam runner.
 func Execute(ctx context.Context, p *beam.Pipeline) error {
+       if !beam.Initialized() {
+               panic(fmt.Sprint("Beam has not been initialized. Call 
beam.Init() before pipeline construction."))
+       }
+
+       if *jobopts.Strict {
+               log.Info(ctx, "Strict mode enabled, applying additional 
validation.")
+               if err := vet.Execute(ctx, p); err != nil {
+                       return errors.Wrap(err, "strictness check failed")
+               }
+               log.Info(ctx, "Strict mode validation passed.")
+       }
+
        endpoint, err := jobopts.GetEndpoint()
        if err != nil {
                return err
diff --git a/sdks/go/pkg/beam/runners/vet/vet.go 
b/sdks/go/pkg/beam/runners/vet/vet.go
index a700a80..77b8769 100644
--- a/sdks/go/pkg/beam/runners/vet/vet.go
+++ b/sdks/go/pkg/beam/runners/vet/vet.go
@@ -27,7 +27,6 @@ package vet
 import (
        "bytes"
        "context"
-       "errors"
        "fmt"
        "reflect"
        "strings"
@@ -43,6 +42,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 func init() {
@@ -54,20 +54,22 @@ func init() {
 type disabledResolver bool
 
 func (p disabledResolver) Sym2Addr(name string) (uintptr, error) {
-       return 0, fmt.Errorf("%v not found. Use runtime.RegisterFunction in 
unit tests", name)
+       return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in 
unit tests", name)
 }
 
 // Execute evaluates the pipeline on whether it can run without reflection.
 func Execute(ctx context.Context, p *beam.Pipeline) error {
        e, err := Evaluate(ctx, p)
        if err != nil {
-               return err
+               return errors.WithContext(err, "validating pipeline with vet 
runner")
        }
        if !e.Performant() {
                e.summary()
                e.Generate("main")
                e.diag("*/\n")
-               return fmt.Errorf("pipeline is not performant, see diagnostic 
summary:\n%s\n%s", string(e.d.Bytes()), string(e.Bytes()))
+               err := errors.Errorf("pipeline is not performant, see 
diagnostic summary:\n%s\n%s", string(e.d.Bytes()), string(e.Bytes()))
+               err = errors.WithContext(err, "validating pipeline with vet 
runner")
+               return errors.SetTopLevelMsg(err, "pipeline is not performant")
        }
        // Pipeline nas no further tasks.
        return nil

Reply via email to