This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 8b213c6 [BEAM-11095] Better error handling for iter/reiter/multimap (#16794) 8b213c6 is described below commit 8b213c617ef8cf3a077bb0002b6b0fec8e85cb05 Author: Danny McCormick <dannymccorm...@google.com> AuthorDate: Wed Feb 9 21:54:03 2022 -0500 [BEAM-11095] Better error handling for iter/reiter/multimap (#16794) --- sdks/go/pkg/beam/core/funcx/fn.go | 10 +++- sdks/go/pkg/beam/core/funcx/fn_test.go | 15 +++++ sdks/go/pkg/beam/core/funcx/sideinput.go | 95 ++++++++++++++++++++++++++------ 3 files changed, 101 insertions(+), 19 deletions(-) diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go index cfa66a0..c1bbea5 100644 --- a/sdks/go/pkg/beam/core/funcx/fn.go +++ b/sdks/go/pkg/beam/core/funcx/fn.go @@ -350,7 +350,15 @@ func New(fn reflectx.Func) (*Fn, error) { if ok, err := IsMalformedEmit(t); ok { return nil, errors.Wrapf(err, "bad parameter type for %s: %v", fn.Name(), t) } - // TODO(damccorm) 2022.02.08: Handle IsMalformed[Iter, ReIter, MultiMap] cases (BEAM-11095) + if ok, err := IsMalformedIter(t); ok { + return nil, errors.Wrapf(err, "bad parameter type for %s: %v", fn.Name(), t) + } + if ok, err := IsMalformedReIter(t); ok { + return nil, errors.Wrapf(err, "bad parameter type for %s: %v", fn.Name(), t) + } + if ok, err := IsMalformedMultiMap(t); ok { + return nil, errors.Wrapf(err, "bad parameter type for %s: %v", fn.Name(), t) + } return nil, errors.Errorf("bad parameter type for %s: %v", fn.Name(), t) } diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go b/sdks/go/pkg/beam/core/funcx/fn_test.go index 71b73ff..78fd3a1 100644 --- a/sdks/go/pkg/beam/core/funcx/fn_test.go +++ b/sdks/go/pkg/beam/core/funcx/fn_test.go @@ -232,6 +232,21 @@ func TestNew(t *testing.T) { }, Err: errors.New(errIllegalParametersInEmit), }, + { + Name: "errIllegalParametersInIter - malformed Iter", + Fn: func(int, func(*nonConcreteType) bool, func(*int, *string) bool) {}, + Err: errors.New(errIllegalParametersInIter), + }, + { + Name: "errIllegalParametersInIter - malformed ReIter", + Fn: func(int, func() func(*nonConcreteType) bool, func(*int, *string) bool) {}, + Err: errors.New(errIllegalParametersInReIter), + }, + { + Name: "errIllegalParametersInMultiMap - malformed MultiMap", + Fn: func(int, func(string) func(*nonConcreteType) bool) {}, + Err: errors.New(errIllegalParametersInMultiMap), + }, } for _, test := range tests { diff --git a/sdks/go/pkg/beam/core/funcx/sideinput.go b/sdks/go/pkg/beam/core/funcx/sideinput.go index e1f328a..168ff3c 100644 --- a/sdks/go/pkg/beam/core/funcx/sideinput.go +++ b/sdks/go/pkg/beam/core/funcx/sideinput.go @@ -20,6 +20,13 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" +) + +var ( + errIllegalParametersInIter = "All parameters in an iter must be universal type, container type, or concrete type" + errIllegalParametersInReIter = "Output of a reiter must be valid iter type" + errIllegalParametersInMultiMap = "Output of a multimap must be valid iter type" ) // IsIter returns true iff the supplied type is a "single sweep functional iterator". @@ -30,10 +37,20 @@ import ( // will be copied into the supplied pointers. The function returns true if // data was copied, and false if there is no more data available. func IsIter(t reflect.Type) bool { - _, ok := UnfoldIter(t) + _, ok, _ := unfoldIter(t) return ok } +// IsMalformedIter returns true iff the supplied type is an illegal "single sweep +// functional iterator" and an error explaining why it is illegal. For example, +// an iterator is not legal if one of its parameters is not concrete, universal, or +// a container type. If the type does not have the structure of an iter or it is a +// legal iter, IsMalformedIter returns false and no error. +func IsMalformedIter(t reflect.Type) (bool, error) { + _, _, err := unfoldIter(t) + return err != nil, err +} + // UnfoldIter returns the parameter types, if a single sweep functional // iterator. For example: // @@ -42,15 +59,20 @@ func IsIter(t reflect.Type) bool { // func (*typex.EventTime, *int) bool returns {typex.EventTime, int} // func UnfoldIter(t reflect.Type) ([]reflect.Type, bool) { + types, ok, _ := unfoldIter(t) + return types, ok +} + +func unfoldIter(t reflect.Type) ([]reflect.Type, bool, error) { if t.Kind() != reflect.Func { - return nil, false + return nil, false, nil } if t.NumOut() != 1 || t.Out(0) != reflectx.Bool { - return nil, false + return nil, false, nil } if t.NumIn() == 0 { - return nil, false + return nil, false, nil } var ret []reflect.Type @@ -60,23 +82,26 @@ func UnfoldIter(t reflect.Type) ([]reflect.Type, bool) { skip = 1 } if t.NumIn()-skip > 2 || t.NumIn() == skip { - return nil, false + return nil, false, nil } for i := skip; i < t.NumIn(); i++ { - if !isOutParam(t.In(i)) { - return nil, false + if ok, err := isOutParam(t.In(i)); !ok { + return nil, false, errors.Wrap(err, errIllegalParametersInIter) } ret = append(ret, t.In(i).Elem()) } - return ret, true + return ret, true, nil } -func isOutParam(t reflect.Type) bool { +func isOutParam(t reflect.Type) (bool, error) { if t.Kind() != reflect.Ptr { - return false + return false, errors.Errorf("Type %v of kind %v not allowed, must be ptr type", t, t.Kind()) } - return typex.IsConcrete(t.Elem()) || typex.IsUniversal(t.Elem()) || typex.IsContainer(t.Elem()) + if typex.IsUniversal(t.Elem()) || typex.IsContainer(t.Elem()) { + return true, nil + } + return typex.CheckConcrete(t.Elem()) } // IsReIter returns true iff the supplied type is a functional iterator generator. @@ -88,15 +113,34 @@ func IsReIter(t reflect.Type) bool { return ok } +// IsMalformedReIter returns true iff the supplied type is an illegal functional +// iterator generator and an error explaining why it is illegal. An iterator generator +// is not legal if its output is not of type iterator. If the type does not +// have the structure of an iterator generator or it is a legal iterator generator, +// IsMalformedReIter returns false and no error. +func IsMalformedReIter(t reflect.Type) (bool, error) { + _, _, err := unfoldReIter(t) + return err != nil, err +} + // UnfoldReIter returns the parameter types, if a functional iterator generator. func UnfoldReIter(t reflect.Type) ([]reflect.Type, bool) { + types, ok, _ := unfoldReIter(t) + return types, ok +} + +func unfoldReIter(t reflect.Type) ([]reflect.Type, bool, error) { if t.Kind() != reflect.Func { - return nil, false + return nil, false, nil } if t.NumIn() != 0 || t.NumOut() != 1 { - return nil, false + return nil, false, nil + } + types, ok, err := unfoldIter(t.Out(0)) + if err != nil { + err = errors.Wrap(err, errIllegalParametersInReIter) } - return UnfoldIter(t.Out(0)) + return types, ok, err } // IsMultiMap returns true iff the supplied type is a keyed functional iterator @@ -109,17 +153,32 @@ func IsMultiMap(t reflect.Type) bool { return ok } +// IsMalformedMultiMap returns true iff the supplied type is an illegal keyed functional +// iterator generator and an error explaining why it is illegal. A keyed iterator generator +// is not legal if its output is not of type iterator. If the type does not have the +// structure of a keyed iterator generator or it is a legal iterator generator, +// IsMalformedMultiMap returns false and no error. +func IsMalformedMultiMap(t reflect.Type) (bool, error) { + _, _, err := unfoldMultiMap(t) + return err != nil, err +} + // UnfoldMultiMap returns the parameter types for the input key and the output // values iff the type is a keyed functional iterator generator. func UnfoldMultiMap(t reflect.Type) ([]reflect.Type, bool) { + types, ok, _ := unfoldMultiMap(t) + return types, ok +} + +func unfoldMultiMap(t reflect.Type) ([]reflect.Type, bool, error) { if t.Kind() != reflect.Func { - return nil, false + return nil, false, nil } if t.NumIn() != 1 || t.NumOut() != 1 { - return nil, false + return nil, false, nil } types := []reflect.Type{t.In(0)} - iterTypes, is := UnfoldIter(t.Out(0)) + iterTypes, is, err := unfoldIter(t.Out(0)) types = append(types, iterTypes...) - return types, is + return types, is, errors.Wrap(err, errIllegalParametersInMultiMap) }