youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor 
DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r388089432
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -209,21 +209,58 @@ func (f *DoFn) RestrictionT() *reflect.Type {
 // a KV or not based on the other signatures (unless we're more loose about 
which
 // sideinputs are present). Bind should respect that.
 
+// Constants so we can avoid magic numbers in validation. Represent number of
+// DoFn main inputs based on what kind of input the DoFn has.
+const (
+       unknownInNum = -1 // Used when we don't know the number of main inputs.
+       singleInNum  = 1
+       kvInNum      = 2
+)
+
 // NewDoFn constructs a DoFn from the given value, if possible.
 func NewDoFn(fn interface{}) (*DoFn, error) {
        ret, err := NewFn(fn)
        if err != nil {
                return nil, errors.WithContext(errors.Wrapf(err, "invalid 
DoFn"), "constructing DoFn")
        }
-       return AsDoFn(ret)
+       return AsDoFn(ret, unknownInNum)
 }
 
-// AsDoFn converts a Fn to a DoFn, if possible.
-func AsDoFn(fn *Fn) (*DoFn, error) {
+// NewDoFnKv constructs a DoFn from the given value, if possible, with
+// improved validation from knowing whether the DoFn's main input is a KV or
+// single element.
+func NewDoFnKv(fn interface{}, mainKv bool) (*DoFn, error) {
+       ret, err := NewFn(fn)
+       if err != nil {
+               return nil, errors.WithContext(errors.Wrapf(err, "invalid 
DoFn"), "constructing DoFn")
+       }
+
+       if mainKv {
+               return AsDoFn(ret, kvInNum)
+       } else {
+               return AsDoFn(ret, singleInNum)
+       }
+}
+
+// AsDoFn converts a Fn to a DoFn, if possible. numMainIn specifies how many
+// main inputs are expected in the DoFn's method signatures. Valid values are
+// -1 (unknown), 1 (single elements), or 2 (KVs). If the value is unknown then
+// validation is done by best effort and may miss some edge cases.
+func AsDoFn(fn *Fn, numMainIn int) (*DoFn, error) {
        addContext := func(err error, fn *Fn) error {
                return errors.WithContextf(err, "graph.AsDoFn: for Fn named 
%v", fn.Name())
        }
 
+       // Validate numMainIn. This check should match this method's comment.
+       if numMainIn != unknownInNum &&
+               numMainIn != singleInNum &&
+               numMainIn != kvInNum {
+               err := errors.Errorf("invalid number of main inputs given. "+
+                       "Got: %v, Want: One of the following: %v",
+                       processElementName, []int{unknownInNum, singleInNum, 
kvInNum})
+               return nil, addContext(err, fn)
+       }
 
 Review comment:
   I like that much better, done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to