This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new aef01da [BEAM-7086] Refactoring repetitive error code. new 5556467 Merge pull request #8674 from youngoli/beam7086 aef01da is described below commit aef01da787f6c92bf8d7491dfa89ac43cbe85ace Author: Daniel Oliveira <daniel.o.program...@gmail.com> AuthorDate: Thu May 23 15:46:19 2019 -0700 [BEAM-7086] Refactoring repetitive error code. Straightforward change, make some function literals for the really repetitive errors. --- sdks/go/pkg/beam/core/graph/bind.go | 30 ++++++++++++---------- sdks/go/pkg/beam/core/graph/edge.go | 51 ++++++++++++++++++------------------- 2 files changed, 42 insertions(+), 39 deletions(-) diff --git a/sdks/go/pkg/beam/core/graph/bind.go b/sdks/go/pkg/beam/core/graph/bind.go index 77a6d69..f5cc7f8 100644 --- a/sdks/go/pkg/beam/core/graph/bind.go +++ b/sdks/go/pkg/beam/core/graph/bind.go @@ -60,30 +60,34 @@ import ( // // Here, the inbound shape and output types are different from before. func Bind(fn *funcx.Fn, typedefs map[string]reflect.Type, in ...typex.FullType) ([]typex.FullType, []InputKind, []typex.FullType, []typex.FullType, error) { + addContext := func(err error, fn *funcx.Fn) error { + return errors.WithContextf(err, "binding fn %v", fn.Fn.Name()) + } + inbound, kinds, err := findInbound(fn, in...) if err != nil { - return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name()) + return nil, nil, nil, nil, addContext(err, fn) } outbound, err := findOutbound(fn) if err != nil { - return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name()) + return nil, nil, nil, nil, addContext(err, fn) } subst, err := typex.Bind(inbound, in) if err != nil { - return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name()) + return nil, nil, nil, nil, addContext(err, fn) } for k, v := range typedefs { if substK, exists := subst[k]; exists { err := errors.Errorf("cannot substitute type %v with %v, already defined as %v", k, v, substK) - return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name()) + return nil, nil, nil, nil, addContext(err, fn) } subst[k] = v } out, err := typex.Substitute(outbound, subst) if err != nil { - return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name()) + return nil, nil, nil, nil, addContext(err, fn) } return inbound, kinds, outbound, out, nil } @@ -128,6 +132,9 @@ func returnTypes(list []funcx.ReturnParam) []reflect.Type { func findInbound(fn *funcx.Fn, in ...typex.FullType) ([]typex.FullType, []InputKind, error) { // log.Printf("Bind inbound: %v %v", fn, in) + addContext := func(err error, p []funcx.FnParam, in interface{}) error { + return errors.WithContextf(err, "binding params %v to input %v", p, in) + } var inbound []typex.FullType var kinds []InputKind @@ -136,29 +143,26 @@ func findInbound(fn *funcx.Fn, in ...typex.FullType) ([]typex.FullType, []InputK for _, input := range in { arity, err := inboundArity(input, index == 0) if err != nil { - return nil, nil, errors.WithContextf(err, "binding params %v to input %v", params, input) + return nil, nil, addContext(err, params, input) } if len(params)-index < arity { - err := errors.New("too few params") - return nil, nil, errors.WithContextf(err, "binding params %v to input %v", params[index:], input) + return nil, nil, addContext(errors.New("too few params"), params[index:], input) } paramsToBind := params[index : index+arity] elm, kind, err := tryBindInbound(input, paramsToBind, index == 0) if err != nil { - return nil, nil, errors.WithContextf(err, "binding params %v to input %v", paramsToBind, input) + return nil, nil, addContext(err, paramsToBind, input) } inbound = append(inbound, elm) kinds = append(kinds, kind) index += arity } if index < len(params) { - err := errors.New("too few inputs: forgot an input or to annotate options?") - return nil, nil, errors.WithContextf(err, "binding params %v to inputs %v:", params, in) + return nil, nil, addContext(errors.New("too few inputs: forgot an input or to annotate options?"), params, in) } if index > len(params) { - err := errors.New("too many inputs") - return nil, nil, errors.WithContextf(err, "binding params %v to inputs %v:", params, in) + return nil, nil, addContext(errors.New("too many inputs"), params, in) } return inbound, kinds, nil } diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go index 846dceb..dcdfd0b 100644 --- a/sdks/go/pkg/beam/core/graph/edge.go +++ b/sdks/go/pkg/beam/core/graph/edge.go @@ -186,14 +186,15 @@ func (e *MultiEdge) String() string { // NewCoGBK inserts a new CoGBK edge into the graph. func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) { + addContext := func(err error, s *Scope) error { + return errors.WithContextf(err, "creating new CoGBK in scope %v", s) + } + if len(ns) == 0 { - // TODO(BEAM-7086) Reduce the repetition in the context of all the errors in this file. - err := errors.New("needs at least 1 input") - return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) + return nil, addContext(errors.New("needs at least 1 input"), s) } if !typex.IsKV(ns[0].Type()) { - err := errors.Errorf("input type must be KV: %v", ns[0]) - return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) + return nil, addContext(errors.Errorf("input type must be KV: %v", ns[0]), s) } // (1) Create CoGBK result type: KV<T,U>, .., KV<T,Z> -> CoGBK<T,U,..,Z>. @@ -206,20 +207,16 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) { for i := 1; i < len(ns); i++ { n := ns[i] if !typex.IsKV(n.Type()) { - err := errors.Errorf("input type must be KV: %v", n) - return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) + return nil, addContext(errors.Errorf("input type must be KV: %v", n), s) } if !n.Coder.Components[0].Equals(c) { - err := errors.Errorf("key coder for %v is %v, want %v", n, n.Coder.Components[0], c) - return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) + return nil, addContext(errors.Errorf("key coder for %v is %v, want %v", n, n.Coder.Components[0], c), s) } if !w.Equals(n.WindowingStrategy()) { - err := errors.Errorf("mismatched CoGBK windowing strategies: %v, want %v", n.WindowingStrategy(), w) - return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) + return nil, addContext(errors.Errorf("mismatched CoGBK windowing strategies: %v, want %v", n.WindowingStrategy(), w), s) } if bounded != n.Bounded() { - err := errors.Errorf("unmatched CoGBK boundedness: %v, want %v", n.Bounded(), bounded) - return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s) + return nil, addContext(errors.Errorf("unmatched CoGBK boundedness: %v, want %v", n.Bounded(), bounded), s) } comp = append(comp, n.Type().Components()[1]) @@ -242,9 +239,12 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) { // NewFlatten inserts a new Flatten edge in the graph. Flatten output type is // the shared input type. func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) { + addContext := func(err error, s *Scope) error { + return errors.WithContextf(err, "creating new Flatten in scope %v", s) + } + if len(in) < 2 { - err := errors.Errorf("Flatten needs at least 2 input, got %v", len(in)) - return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s) + return nil, addContext(errors.Errorf("Flatten needs at least 2 input, got %v", len(in)), s) } t := in[0].Type() w := inputWindow(in) @@ -260,17 +260,14 @@ func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) { } for _, n := range in { if !typex.IsEqual(t, n.Type()) { - err := errors.Errorf("mismatched Flatten input types: %v, want %v", n.Type(), t) - return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s) + return nil, addContext(errors.Errorf("mismatched Flatten input types: %v, want %v", n.Type(), t), s) } if !w.Equals(n.WindowingStrategy()) { - err := errors.Errorf("mismatched Flatten window types: %v, want %v", n.WindowingStrategy(), w) - return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s) + return nil, addContext(errors.Errorf("mismatched Flatten window types: %v, want %v", n.WindowingStrategy(), w), s) } } if typex.IsCoGBK(t) { - err := errors.Errorf("Flatten input type cannot be CoGBK: %v", t) - return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s) + return nil, addContext(errors.Errorf("Flatten input type cannot be CoGBK: %v", t), s) } edge := g.NewEdge(s) @@ -338,14 +335,16 @@ const CombinePerKeyScope = "CombinePerKey" // NewCombine inserts a new Combine edge into the graph. Combines cannot have side // input. func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*MultiEdge, error) { + addContext := func(err error, s *Scope) error { + return errors.WithContextf(err, "creating new Combine in scope %v", s) + } + inT := in.Type() if !typex.IsCoGBK(inT) { - err := errors.Errorf("Combine requires CoGBK type: %v", inT) - return nil, errors.WithContextf(err, "creating new Combine in scope %v", s) + return nil, addContext(errors.Errorf("Combine requires CoGBK type: %v", inT), s) } if len(inT.Components()) > 2 { - err := errors.Errorf("Combine cannot follow multi-input CoGBK: %v", inT) - return nil, errors.WithContextf(err, "creating new Combine in scope %v", s) + return nil, addContext(errors.Errorf("Combine cannot follow multi-input CoGBK: %v", inT), s) } // Create a synthetic function for binding purposes. It takes main input @@ -394,7 +393,7 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*M inbound, kinds, outbound, out, err := Bind(synth, nil, inT) if err != nil { - return nil, errors.WithContextf(err, "creating new Combine in scope %v", s) + return nil, addContext(err, s) } edge := g.NewEdge(s)