lostluck commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs. URL: https://github.com/apache/beam/pull/10991#discussion_r385893620
########## File path: sdks/go/pkg/beam/core/graph/fn.go ########## @@ -209,21 +209,58 @@ func (f *DoFn) RestrictionT() *reflect.Type { // a KV or not based on the other signatures (unless we're more loose about which // sideinputs are present). Bind should respect that. +// Constants so we can avoid magic numbers in validation. Represent number of +// DoFn main inputs based on what kind of input the DoFn has. +const ( + unknownInNum = -1 // Used when we don't know the number of main inputs. + singleInNum = 1 + kvInNum = 2 +) + // NewDoFn constructs a DoFn from the given value, if possible. func NewDoFn(fn interface{}) (*DoFn, error) { ret, err := NewFn(fn) if err != nil { return nil, errors.WithContext(errors.Wrapf(err, "invalid DoFn"), "constructing DoFn") } - return AsDoFn(ret) + return AsDoFn(ret, unknownInNum) } -// AsDoFn converts a Fn to a DoFn, if possible. -func AsDoFn(fn *Fn) (*DoFn, error) { +// NewDoFnKv constructs a DoFn from the given value, if possible, with +// improved validation from knowing whether the DoFn's main input is a KV or +// single element. +func NewDoFnKv(fn interface{}, mainKv bool) (*DoFn, error) { + ret, err := NewFn(fn) + if err != nil { + return nil, errors.WithContext(errors.Wrapf(err, "invalid DoFn"), "constructing DoFn") + } + + if mainKv { + return AsDoFn(ret, kvInNum) + } else { + return AsDoFn(ret, singleInNum) + } +} + +// AsDoFn converts a Fn to a DoFn, if possible. numMainIn specifies how many +// main inputs are expected in the DoFn's method signatures. Valid values are +// -1 (unknown), 1 (single elements), or 2 (KVs). If the value is unknown then +// validation is done by best effort and may miss some edge cases. +func AsDoFn(fn *Fn, numMainIn int) (*DoFn, error) { addContext := func(err error, fn *Fn) error { return errors.WithContextf(err, "graph.AsDoFn: for Fn named %v", fn.Name()) } + // Validate numMainIn. This check should match this method's comment. + if numMainIn != unknownInNum && + numMainIn != singleInNum && + numMainIn != kvInNum { + err := errors.Errorf("invalid number of main inputs given. "+ + "Got: %v, Want: One of the following: %v", + processElementName, []int{unknownInNum, singleInNum, kvInNum}) + return nil, addContext(err, fn) + } Review comment: Consider a switch instead. ```suggestion switch numMainIn { case unknownInNum, singleInNum, kvInNum: // Valid default: // Invalid err := errors.Errorf("invalid number of main inputs given. "+ "Got: %v, Want: One of the following: %v", processElementName, []int{unknownInNum, singleInNum, kvInNum}) return nil, addContext(err, fn) } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services