youngoli commented on a change in pull request #13940:
URL: https://github.com/apache/beam/pull/13940#discussion_r574921728
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##########
@@ -102,52 +159,104 @@ func (r *Registry) registerType(ut reflect.Type, seen
map[reflect.Type]struct{})
if lID, ok := r.logicalTypeIdentifiers[t]; ok {
lt := r.logicalTypes[lID]
r.addToMaps(lt.StorageType(), t)
- return
+ return nil
}
- for _, lti := range r.logicalTypeInterfaces {
- if !t.Implements(lti) {
- continue
- }
+
+ useProvider := func(t, lti reflect.Type) (bool, error) {
p := r.logicalTypeProviders[lti]
st, err := p(t)
if err != nil {
- panic(errors.Wrapf(err, "unable to convert
LogicalType[%v] using provider for %v", t, lti))
+ return false, errors.Wrapf(err, "unable to convert
LogicalType[%v] using provider for %v", t, lti)
}
if st == nil {
- continue
+ return false, nil
}
r.RegisterLogicalType(ToLogicalType(t.String(), t, st))
r.addToMaps(st, t)
- return
+ return true, nil
+ }
+
+ for _, lti := range r.logicalTypeInterfaces {
+ if t.Implements(lti) {
+ ok, err := useProvider(t, lti)
+ if err != nil {
+ return err
+ }
+ if ok {
+ return nil
+ }
+ }
+ if t := reflect.PtrTo(t); t.Implements(lti) {
+ ok, err := useProvider(t, lti)
+ if err != nil {
+ return err
+ }
+ if ok {
+ return nil
+ }
+ }
}
switch t.Kind() {
case reflect.Map:
- r.registerType(t.Key(), seen)
+ if err := r.registerType(t.Key(), seen); err != nil {
+ return err
+ }
fallthrough
case reflect.Array, reflect.Slice, reflect.Ptr:
- r.registerType(t.Elem(), seen)
- return
+ if err := r.registerType(t.Elem(), seen); err != nil {
+ return errors.Wrapf(err, "type is of kind %v", t.Kind())
+ }
+ return nil
+ case reflect.Interface, reflect.Func, reflect.Chan, reflect.Invalid,
reflect.UnsafePointer, reflect.Uintptr:
+ // Ignore these, as they can't be serialized.
+ return nil
+ case reflect.Complex64, reflect.Complex128:
+ // TODO(BEAM-9615): Support complex number types.
+ return nil
case reflect.Struct: // What we expect here.
default:
- return
+ rt := reflectKindToTypeMap[t.Kind()]
+ // It's only a logical type if it's not a built in primitive
type.
+ if t != rt {
+ st, ok := reflectKindToTypeMap[t.Kind()]
+ if !ok {
+ fmt.Printf("\n nil type to RegisterLogicalType
for t %v\n", t)
Review comment:
Two comments here:
1. The printf looks like it's leftover from debugging. Should it be removed
or changed to an error/log message?
2. Maybe you could simplify this section of the code by structuring it like
so:
```
rt, ok := reflectKindToTypeMap[t.Kind()]
if !ok {
...
return nil
}
if t != rt {
r.RegisterLogicalType(...)
}
return nil
```
##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -47,7 +46,7 @@ func RegisterDoFn(dofn interface{}) {
}
for _, t := range ts {
runtime.RegisterType(t)
- schema.RegisterType(t)
+ // schema.RegisterType(t)
Review comment:
Nit: This looks like it can be deleted. Unless you want it there as a
reminder for a future change in which case this could have a TODO added to it.
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##########
@@ -102,52 +159,104 @@ func (r *Registry) registerType(ut reflect.Type, seen
map[reflect.Type]struct{})
if lID, ok := r.logicalTypeIdentifiers[t]; ok {
lt := r.logicalTypes[lID]
r.addToMaps(lt.StorageType(), t)
- return
+ return nil
}
- for _, lti := range r.logicalTypeInterfaces {
- if !t.Implements(lti) {
- continue
- }
+
+ useProvider := func(t, lti reflect.Type) (bool, error) {
p := r.logicalTypeProviders[lti]
st, err := p(t)
if err != nil {
- panic(errors.Wrapf(err, "unable to convert
LogicalType[%v] using provider for %v", t, lti))
+ return false, errors.Wrapf(err, "unable to convert
LogicalType[%v] using provider for %v", t, lti)
}
if st == nil {
- continue
+ return false, nil
}
r.RegisterLogicalType(ToLogicalType(t.String(), t, st))
r.addToMaps(st, t)
- return
+ return true, nil
+ }
+
+ for _, lti := range r.logicalTypeInterfaces {
+ if t.Implements(lti) {
+ ok, err := useProvider(t, lti)
+ if err != nil {
+ return err
+ }
+ if ok {
+ return nil
+ }
+ }
+ if t := reflect.PtrTo(t); t.Implements(lti) {
+ ok, err := useProvider(t, lti)
+ if err != nil {
+ return err
+ }
+ if ok {
+ return nil
+ }
+ }
}
switch t.Kind() {
case reflect.Map:
- r.registerType(t.Key(), seen)
+ if err := r.registerType(t.Key(), seen); err != nil {
+ return err
+ }
fallthrough
case reflect.Array, reflect.Slice, reflect.Ptr:
- r.registerType(t.Elem(), seen)
- return
+ if err := r.registerType(t.Elem(), seen); err != nil {
+ return errors.Wrapf(err, "type is of kind %v", t.Kind())
+ }
+ return nil
+ case reflect.Interface, reflect.Func, reflect.Chan, reflect.Invalid,
reflect.UnsafePointer, reflect.Uintptr:
+ // Ignore these, as they can't be serialized.
+ return nil
+ case reflect.Complex64, reflect.Complex128:
+ // TODO(BEAM-9615): Support complex number types.
+ return nil
case reflect.Struct: // What we expect here.
default:
- return
+ rt := reflectKindToTypeMap[t.Kind()]
+ // It's only a logical type if it's not a built in primitive
type.
+ if t != rt {
+ st, ok := reflectKindToTypeMap[t.Kind()]
+ if !ok {
+ fmt.Printf("\n nil type to RegisterLogicalType
for t %v\n", t)
+ return nil
+ }
+ r.RegisterLogicalType(ToLogicalType(t.String(), t, st))
+ }
+ return nil
}
runtime.RegisterType(ut)
for i := 0; i < t.NumField(); i++ {
sf := ut.Field(i)
- r.registerType(sf.Type, seen)
+ isUnexported := sf.PkgPath != ""
+ if isUnexported {
+ // Schemas can't handle unexported fields at all.
+ continue
+ }
+ if err := r.registerType(sf.Type, seen); err != nil {
+ return errors.Wrapf(err, "registering type for field %v
in %v", sf.Name, ut)
+ }
+ if implements(sf.Type, sdfRtrackerType) {
Review comment:
This line looks like it should be above the `r.registerType` call.
Although to be fair, users shouldn't be passing restriction trackers between
transforms at all, much less over fusion boundaries. So I'm not sure how often
this case would even be hit.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]