This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b213c6  [BEAM-11095] Better error handling for iter/reiter/multimap 
(#16794)
8b213c6 is described below

commit 8b213c617ef8cf3a077bb0002b6b0fec8e85cb05
Author: Danny McCormick <dannymccorm...@google.com>
AuthorDate: Wed Feb 9 21:54:03 2022 -0500

    [BEAM-11095] Better error handling for iter/reiter/multimap (#16794)
---
 sdks/go/pkg/beam/core/funcx/fn.go        | 10 +++-
 sdks/go/pkg/beam/core/funcx/fn_test.go   | 15 +++++
 sdks/go/pkg/beam/core/funcx/sideinput.go | 95 ++++++++++++++++++++++++++------
 3 files changed, 101 insertions(+), 19 deletions(-)

diff --git a/sdks/go/pkg/beam/core/funcx/fn.go 
b/sdks/go/pkg/beam/core/funcx/fn.go
index cfa66a0..c1bbea5 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -350,7 +350,15 @@ func New(fn reflectx.Func) (*Fn, error) {
                        if ok, err := IsMalformedEmit(t); ok {
                                return nil, errors.Wrapf(err, "bad parameter 
type for %s: %v", fn.Name(), t)
                        }
-                       // TODO(damccorm) 2022.02.08: Handle IsMalformed[Iter, 
ReIter, MultiMap] cases (BEAM-11095)
+                       if ok, err := IsMalformedIter(t); ok {
+                               return nil, errors.Wrapf(err, "bad parameter 
type for %s: %v", fn.Name(), t)
+                       }
+                       if ok, err := IsMalformedReIter(t); ok {
+                               return nil, errors.Wrapf(err, "bad parameter 
type for %s: %v", fn.Name(), t)
+                       }
+                       if ok, err := IsMalformedMultiMap(t); ok {
+                               return nil, errors.Wrapf(err, "bad parameter 
type for %s: %v", fn.Name(), t)
+                       }
                        return nil, errors.Errorf("bad parameter type for %s: 
%v", fn.Name(), t)
                }
 
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go 
b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 71b73ff..78fd3a1 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -232,6 +232,21 @@ func TestNew(t *testing.T) {
                        },
                        Err: errors.New(errIllegalParametersInEmit),
                },
+               {
+                       Name: "errIllegalParametersInIter - malformed Iter",
+                       Fn:   func(int, func(*nonConcreteType) bool, func(*int, 
*string) bool) {},
+                       Err:  errors.New(errIllegalParametersInIter),
+               },
+               {
+                       Name: "errIllegalParametersInIter - malformed ReIter",
+                       Fn:   func(int, func() func(*nonConcreteType) bool, 
func(*int, *string) bool) {},
+                       Err:  errors.New(errIllegalParametersInReIter),
+               },
+               {
+                       Name: "errIllegalParametersInMultiMap - malformed 
MultiMap",
+                       Fn:   func(int, func(string) func(*nonConcreteType) 
bool) {},
+                       Err:  errors.New(errIllegalParametersInMultiMap),
+               },
        }
 
        for _, test := range tests {
diff --git a/sdks/go/pkg/beam/core/funcx/sideinput.go 
b/sdks/go/pkg/beam/core/funcx/sideinput.go
index e1f328a..168ff3c 100644
--- a/sdks/go/pkg/beam/core/funcx/sideinput.go
+++ b/sdks/go/pkg/beam/core/funcx/sideinput.go
@@ -20,6 +20,13 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+)
+
+var (
+       errIllegalParametersInIter     = "All parameters in an iter must be 
universal type, container type, or concrete type"
+       errIllegalParametersInReIter   = "Output of a reiter must be valid iter 
type"
+       errIllegalParametersInMultiMap = "Output of a multimap must be valid 
iter type"
 )
 
 // IsIter returns true iff the supplied type is a "single sweep functional 
iterator".
@@ -30,10 +37,20 @@ import (
 // will be copied into the supplied pointers. The function returns true if
 // data was copied, and false if there is no more data available.
 func IsIter(t reflect.Type) bool {
-       _, ok := UnfoldIter(t)
+       _, ok, _ := unfoldIter(t)
        return ok
 }
 
+// IsMalformedIter returns true iff the supplied type is an illegal "single 
sweep
+// functional iterator" and an error explaining why it is illegal. For example,
+// an iterator is not legal if one of its parameters is not concrete, 
universal, or
+// a container type. If the type does not have the structure of an iter or it 
is a
+// legal iter, IsMalformedIter returns false and no error.
+func IsMalformedIter(t reflect.Type) (bool, error) {
+       _, _, err := unfoldIter(t)
+       return err != nil, err
+}
+
 // UnfoldIter returns the parameter types, if a single sweep functional
 // iterator. For example:
 //
@@ -42,15 +59,20 @@ func IsIter(t reflect.Type) bool {
 //     func (*typex.EventTime, *int) bool returns {typex.EventTime, int}
 //
 func UnfoldIter(t reflect.Type) ([]reflect.Type, bool) {
+       types, ok, _ := unfoldIter(t)
+       return types, ok
+}
+
+func unfoldIter(t reflect.Type) ([]reflect.Type, bool, error) {
        if t.Kind() != reflect.Func {
-               return nil, false
+               return nil, false, nil
        }
 
        if t.NumOut() != 1 || t.Out(0) != reflectx.Bool {
-               return nil, false
+               return nil, false, nil
        }
        if t.NumIn() == 0 {
-               return nil, false
+               return nil, false, nil
        }
 
        var ret []reflect.Type
@@ -60,23 +82,26 @@ func UnfoldIter(t reflect.Type) ([]reflect.Type, bool) {
                skip = 1
        }
        if t.NumIn()-skip > 2 || t.NumIn() == skip {
-               return nil, false
+               return nil, false, nil
        }
 
        for i := skip; i < t.NumIn(); i++ {
-               if !isOutParam(t.In(i)) {
-                       return nil, false
+               if ok, err := isOutParam(t.In(i)); !ok {
+                       return nil, false, errors.Wrap(err, 
errIllegalParametersInIter)
                }
                ret = append(ret, t.In(i).Elem())
        }
-       return ret, true
+       return ret, true, nil
 }
 
-func isOutParam(t reflect.Type) bool {
+func isOutParam(t reflect.Type) (bool, error) {
        if t.Kind() != reflect.Ptr {
-               return false
+               return false, errors.Errorf("Type %v of kind %v not allowed, 
must be ptr type", t, t.Kind())
        }
-       return typex.IsConcrete(t.Elem()) || typex.IsUniversal(t.Elem()) || 
typex.IsContainer(t.Elem())
+       if typex.IsUniversal(t.Elem()) || typex.IsContainer(t.Elem()) {
+               return true, nil
+       }
+       return typex.CheckConcrete(t.Elem())
 }
 
 // IsReIter returns true iff the supplied type is a functional iterator 
generator.
@@ -88,15 +113,34 @@ func IsReIter(t reflect.Type) bool {
        return ok
 }
 
+// IsMalformedReIter returns true iff the supplied type is an illegal 
functional
+// iterator generator and an error explaining why it is illegal. An iterator 
generator
+// is not legal if its output is not of type iterator. If the type does not
+// have the structure of an iterator generator or it is a legal iterator 
generator,
+// IsMalformedReIter returns false and no error.
+func IsMalformedReIter(t reflect.Type) (bool, error) {
+       _, _, err := unfoldReIter(t)
+       return err != nil, err
+}
+
 // UnfoldReIter returns the parameter types, if a functional iterator 
generator.
 func UnfoldReIter(t reflect.Type) ([]reflect.Type, bool) {
+       types, ok, _ := unfoldReIter(t)
+       return types, ok
+}
+
+func unfoldReIter(t reflect.Type) ([]reflect.Type, bool, error) {
        if t.Kind() != reflect.Func {
-               return nil, false
+               return nil, false, nil
        }
        if t.NumIn() != 0 || t.NumOut() != 1 {
-               return nil, false
+               return nil, false, nil
+       }
+       types, ok, err := unfoldIter(t.Out(0))
+       if err != nil {
+               err = errors.Wrap(err, errIllegalParametersInReIter)
        }
-       return UnfoldIter(t.Out(0))
+       return types, ok, err
 }
 
 // IsMultiMap returns true iff the supplied type is a keyed functional iterator
@@ -109,17 +153,32 @@ func IsMultiMap(t reflect.Type) bool {
        return ok
 }
 
+// IsMalformedMultiMap returns true iff the supplied type is an illegal keyed 
functional
+// iterator generator and an error explaining why it is illegal. A keyed 
iterator generator
+// is not legal if its output is not of type iterator. If the type does not 
have the
+// structure of a keyed iterator generator or it is a legal iterator generator,
+// IsMalformedMultiMap returns false and no error.
+func IsMalformedMultiMap(t reflect.Type) (bool, error) {
+       _, _, err := unfoldMultiMap(t)
+       return err != nil, err
+}
+
 // UnfoldMultiMap returns the parameter types for the input key and the output
 // values iff the type is a keyed functional iterator generator.
 func UnfoldMultiMap(t reflect.Type) ([]reflect.Type, bool) {
+       types, ok, _ := unfoldMultiMap(t)
+       return types, ok
+}
+
+func unfoldMultiMap(t reflect.Type) ([]reflect.Type, bool, error) {
        if t.Kind() != reflect.Func {
-               return nil, false
+               return nil, false, nil
        }
        if t.NumIn() != 1 || t.NumOut() != 1 {
-               return nil, false
+               return nil, false, nil
        }
        types := []reflect.Type{t.In(0)}
-       iterTypes, is := UnfoldIter(t.Out(0))
+       iterTypes, is, err := unfoldIter(t.Out(0))
        types = append(types, iterTypes...)
-       return types, is
+       return types, is, errors.Wrap(err, errIllegalParametersInMultiMap)
 }

Reply via email to