zeroshade commented on code in PR #830:
URL: https://github.com/apache/arrow-go/pull/830#discussion_r3312549919


##########
arrow/avro/schema.go:
##########
@@ -277,22 +335,46 @@ func iterateFields(n *schemaNode) {
        n.arrowField = buildArrowField(n, arrow.StructOf(fields...), md)
 }
 
-func isLogicalSchemaType(s avro.Schema) bool {
-       lts, ok := s.(avro.LogicalTypeSchema)
-       if !ok {
-               return false
+// nullableBranch returns the non-null branch of a two-element ["null", T]
+// union, plus true if the union is in that nullable shape. If the union has
+// more than two branches or no null branch, ok is false.
+//
+// Heterogeneous non-nullable unions (e.g. ["null", "int", "string"] or
+// ["int", "string"]) are not supported and callers panic on them rather
+// than silently picking one arm.
+func nullableBranch(s avro.SchemaNode) (avro.SchemaNode, bool) {
+       if s.Type != "union" || len(s.Branches) < 2 {
+               return avro.SchemaNode{}, false
+       }
+       var nonNull *avro.SchemaNode
+       for i := range s.Branches {
+               b := s.Branches[i]

Review Comment:
   ```suggestion
        for _, b := range s.Branches {
   ```



##########
arrow/avro/reader_types.go:
##########
@@ -574,126 +539,108 @@ func mapFieldBuilders(b array.Builder, field 
arrow.Field, parent *fieldPos) {
                }
        case *array.Time32Builder:
                f.appendFunc = func(data interface{}) error {
-                       appendTime32Data(bt, data)
-                       return nil
+                       return appendTime32Data(bt, data)
                }
        case *array.Time64Builder:
                f.appendFunc = func(data interface{}) error {
-                       appendTime64Data(bt, data)
-                       return nil
+                       return appendTime64Data(bt, data)
                }
        case *array.TimestampBuilder:
                f.appendFunc = func(data interface{}) error {
-                       appendTimestampData(bt, data)
-                       return nil
+                       return appendTimestampData(bt, data)
                }
        }
 }
 
-func appendBinaryData(b *array.BinaryBuilder, data interface{}) {
+// appendUUIDData accepts the two shapes a UUID may arrive as: a [16]byte
+// (fixed(16)+uuid) or a hex-dash string (string+uuid). Other byte lengths
+// are rejected rather than re-interpreted.
+func appendUUIDData(b *extensions.UUIDBuilder, data any, fieldName string) 
error {
        switch dt := data.(type) {
        case nil:
                b.AppendNull()
-       case map[string]any:
-               switch ct := dt["bytes"].(type) {
-               case nil:
-                       b.AppendNull()
+       case string:
+               return b.AppendValueFromString(dt)
+       case [16]byte:
+               b.AppendBytes(dt)
+       case []byte:
+               switch len(dt) {
+               case 16:
+                       b.AppendBytes([16]byte(dt))
+               case 36:
+                       return b.AppendValueFromString(string(dt))
                default:
-                       b.Append(ct.([]byte))
+                       return fmt.Errorf("avro: %d-byte value cannot be a UUID 
for column %q", len(dt), fieldName)
                }
        default:
-               b.Append(fmt.Append([]byte{}, data))
+               return fmt.Errorf("avro: unsupported value of type %T for UUID 
column %q", data, fieldName)
        }
+       return nil
 }
 
-func appendBinaryDictData(b *array.BinaryDictionaryBuilder, data interface{}) {
+func appendBinaryData(b *array.BinaryBuilder, data interface{}) error {
+       switch dt := data.(type) {
+       case nil:
+               b.AppendNull()
+       case []byte:
+               b.Append(dt)
+       default:
+               return fmt.Errorf("avro: unsupported value of type %T for 
Binary column", data)
+       }
+       return nil
+}
+
+func appendBinaryDictData(b *array.BinaryDictionaryBuilder, data interface{}) 
error {
        switch dt := data.(type) {
        case nil:
                b.AppendNull()
        case string:
-               b.AppendString(dt)
-       case map[string]any:
-               switch v := dt["string"].(type) {
-               case nil:
-                       b.AppendNull()
-               case string:
-                       b.AppendString(v)
+               if err := b.AppendString(dt); err != nil {
+                       return fmt.Errorf("avro: enum symbol %q is not in the 
dictionary (schema/data mismatch?): %w", dt, err)
                }
+       default:
+               return fmt.Errorf("avro: unsupported value of type %T for 
Dictionary column", data)
        }
+       return nil
 }
 
-func appendBoolData(b *array.BooleanBuilder, data interface{}) {
+func appendBoolData(b *array.BooleanBuilder, data interface{}) error {
        switch dt := data.(type) {
        case nil:
                b.AppendNull()
        case bool:
                b.Append(dt)
-       case map[string]any:
-               switch v := dt["boolean"].(type) {
-               case nil:
-                       b.AppendNull()
-               case bool:
-                       b.Append(v)
-               }
+       default:
+               return fmt.Errorf("avro: unsupported value of type %T for 
Boolean column", data)
        }
+       return nil
 }
 
-func appendDate32Data(b *array.Date32Builder, data interface{}) {
+func appendDate32Data(b *array.Date32Builder, data interface{}) error {
        switch dt := data.(type) {
        case nil:
                b.AppendNull()
-       case int32:
-               b.Append(arrow.Date32(dt))
-       case map[string]any:
-               switch v := dt["int"].(type) {
-               case nil:
-                       b.AppendNull()
-               case int32:
-                       b.Append(arrow.Date32(v))
-               }
        case time.Time:
                b.Append(arrow.Date32FromTime(dt))
+       default:
+               return fmt.Errorf("avro: unsupported value of type %T for 
Date32 column", data)
        }
+       return nil
 }
 
 func appendDecimal128Data(b *array.Decimal128Builder, data interface{}, typ 
arrow.DecimalType) error {
        switch dt := data.(type) {
        case nil:
                b.AppendNull()
-       case []byte:
-               buf := bytes.NewBuffer(dt)
-               if len(dt) <= 38 {
-                       var intData int64
-                       err := binary.Read(buf, binary.BigEndian, &intData)
-                       if err != nil {
-                               return err
-                       }
-                       b.Append(decimal128.FromI64(intData))
-               } else {
-                       var bigIntData big.Int
-                       
b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
-               }
-       case map[string]any:

Review Comment:
   If anyone was using custom-decoders that emitted `[]byte` or 
`map[string]any` etc. they are now going to see "unsupported value of type X" 
errors where old code would have accepted the input. Let's update the 
description on the PR (and therefore the resulting changelog later) to note 
this change in behavior for anyone who was using custom-decoders.



##########
arrow/avro/reader_types.go:
##########
@@ -574,126 +539,108 @@ func mapFieldBuilders(b array.Builder, field 
arrow.Field, parent *fieldPos) {
                }
        case *array.Time32Builder:
                f.appendFunc = func(data interface{}) error {
-                       appendTime32Data(bt, data)
-                       return nil
+                       return appendTime32Data(bt, data)
                }
        case *array.Time64Builder:
                f.appendFunc = func(data interface{}) error {
-                       appendTime64Data(bt, data)
-                       return nil
+                       return appendTime64Data(bt, data)
                }
        case *array.TimestampBuilder:
                f.appendFunc = func(data interface{}) error {
-                       appendTimestampData(bt, data)
-                       return nil
+                       return appendTimestampData(bt, data)
                }
        }
 }
 
-func appendBinaryData(b *array.BinaryBuilder, data interface{}) {
+// appendUUIDData accepts the two shapes a UUID may arrive as: a [16]byte
+// (fixed(16)+uuid) or a hex-dash string (string+uuid). Other byte lengths
+// are rejected rather than re-interpreted.
+func appendUUIDData(b *extensions.UUIDBuilder, data any, fieldName string) 
error {
        switch dt := data.(type) {
        case nil:
                b.AppendNull()
-       case map[string]any:
-               switch ct := dt["bytes"].(type) {
-               case nil:
-                       b.AppendNull()
+       case string:
+               return b.AppendValueFromString(dt)
+       case [16]byte:
+               b.AppendBytes(dt)
+       case []byte:
+               switch len(dt) {
+               case 16:
+                       b.AppendBytes([16]byte(dt))
+               case 36:
+                       return b.AppendValueFromString(string(dt))

Review Comment:
   let's add a comment here explaining why we have a case for 36 (i guess, 
hex-dash UUID string?)



##########
arrow/avro/schema.go:
##########
@@ -56,33 +67,84 @@ func (node *schemaNode) schemaPath() string {
        return path
 }
 
-func (node *schemaNode) newChild(n string, s avro.Schema) *schemaNode {
+func (node *schemaNode) newChild(n string, s avro.SchemaNode) *schemaNode {
        child := &schemaNode{
-               name:        n,
-               parent:      node,
-               schema:      s,
-               schemaCache: node.schemaCache,
-               index:       int32(len(node.childrens)),
-               depth:       node.depth + 1,
+               name:       n,
+               parent:     node,
+               node:       s,
+               namedCache: node.namedCache,
+               index:      int32(len(node.childrens)),
        }
        node.childrens = append(node.childrens, child)
        return child
 }
 func (node *schemaNode) children() []*schemaNode { return node.childrens }
 
-// func (node *schemaNode) nodeName() string { return node.name }
+// rememberNamed adds a record/enum/fixed SchemaNode to the named-type cache
+// under both its short name and (if a namespace is present) its full name,
+// so later references like {"type": "Address"} or {"type": "ns.Address"}
+// resolve back to the original definition.
+func (node *schemaNode) rememberNamed(s avro.SchemaNode) {
+       if s.Name == "" {
+               return
+       }
+       node.namedCache[s.Name] = s
+       if s.Namespace != "" {
+               node.namedCache[s.Namespace+"."+s.Name] = s
+       }

Review Comment:
   It's worth noting that the hamba SchemaCache keyed on full name only, so for 
schemas with two records named `Foo` in different namespaces a bare `{"type": 
"Foo"}` will resolve to whichever was defined latest as opposed to being an 
error (the previous behavior under hamba).
   
   We should either restrict to full-name lookups or document the change in 
behavior here that where a malformed schema may now silently produce something 
where hamba would have errored.



##########
arrow/avro/schema.go:
##########
@@ -56,33 +67,84 @@ func (node *schemaNode) schemaPath() string {
        return path
 }
 
-func (node *schemaNode) newChild(n string, s avro.Schema) *schemaNode {
+func (node *schemaNode) newChild(n string, s avro.SchemaNode) *schemaNode {
        child := &schemaNode{
-               name:        n,
-               parent:      node,
-               schema:      s,
-               schemaCache: node.schemaCache,
-               index:       int32(len(node.childrens)),
-               depth:       node.depth + 1,
+               name:       n,
+               parent:     node,
+               node:       s,
+               namedCache: node.namedCache,
+               index:      int32(len(node.childrens)),
        }
        node.childrens = append(node.childrens, child)
        return child
 }
 func (node *schemaNode) children() []*schemaNode { return node.childrens }
 
-// func (node *schemaNode) nodeName() string { return node.name }
+// rememberNamed adds a record/enum/fixed SchemaNode to the named-type cache
+// under both its short name and (if a namespace is present) its full name,
+// so later references like {"type": "Address"} or {"type": "ns.Address"}
+// resolve back to the original definition.
+func (node *schemaNode) rememberNamed(s avro.SchemaNode) {
+       if s.Name == "" {
+               return
+       }
+       node.namedCache[s.Name] = s
+       if s.Namespace != "" {
+               node.namedCache[s.Namespace+"."+s.Name] = s
+       }
+}
+
+// resolveRef replaces s with its inline definition if s.Type is a named-type
+// reference rather than a builtin Avro type. atField, when non-empty, names
+// the field this reference appears in and is included in the panic so the
+// user can locate the offending entry.
+func (node *schemaNode) resolveRef(s avro.SchemaNode, atField string) 
avro.SchemaNode {
+       if _, ok := builtinAvroTypes[s.Type]; ok {
+               return s
+       }
+       if def, ok := node.namedCache[s.Type]; ok {
+               return def
+       }
+       loc := node.schemaPath()
+       if atField != "" {
+               loc += "." + atField
+       }
+       panic(fmt.Errorf("unknown named type %q referenced at %s", s.Type, loc))
+}
 
-// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema
-func ArrowSchemaFromAvro(schema avro.Schema) (s *arrow.Schema, err error) {
+// ArrowSchemaFromAvroJSON parses an Avro schema given as JSON text and returns
+// the equivalent Arrow schema.
+func ArrowSchemaFromAvroJSON(schemaJSON string) (*arrow.Schema, error) {
+       schema, err := avro.Parse(schemaJSON)
+       if err != nil {
+               return nil, err
+       }
+       return arrowSchemaFromAvroInternal(schema)
+}
+
+// ArrowSchemaFromAvro returns a new Arrow schema from a parsed Avro schema.
+//
+// Deprecated: Use [ArrowSchemaFromAvroJSON] instead — it does not couple
+// callers to a particular Avro library through its signature.
+func ArrowSchemaFromAvro(schema hambaAvro.Schema) (*arrow.Schema, error) {
+       js, err := json.Marshal(schema)
+       if err != nil {
+               return nil, fmt.Errorf("%w: could not serialize hamba avro 
schema: %w", arrow.ErrInvalid, err)
+       }
+       return ArrowSchemaFromAvroJSON(string(js))
+}
+
+func arrowSchemaFromAvroInternal(schema *avro.Schema) (s *arrow.Schema, err 
error) {
        defer func() {
                if r := recover(); r != nil {
                        s = nil
                        err = utils.FormatRecoveredError("invalid avro schema", 
r)
                }
        }()
+       root := schema.Root()
        n := newSchemaNode()
-       n.schema = schema
-       c := n.newChild(n.schema.(avro.NamedSchema).Name(), n.schema)
+       n.node = root
+       c := n.newChild(root.Name, root)

Review Comment:
   OCF requires a named record at the top, we should probably add a guard or 
explicit "must be named record" check here (the hamba code would have panic'd).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to