zeroshade commented on code in PR #830:
URL: https://github.com/apache/arrow-go/pull/830#discussion_r3312549919
##########
arrow/avro/schema.go:
##########
@@ -277,22 +335,46 @@ func iterateFields(n *schemaNode) {
n.arrowField = buildArrowField(n, arrow.StructOf(fields...), md)
}
-func isLogicalSchemaType(s avro.Schema) bool {
- lts, ok := s.(avro.LogicalTypeSchema)
- if !ok {
- return false
+// nullableBranch returns the non-null branch of a two-element ["null", T]
+// union, plus true if the union is in that nullable shape. If the union has
+// more than two branches or no null branch, ok is false.
+//
+// Heterogeneous non-nullable unions (e.g. ["null", "int", "string"] or
+// ["int", "string"]) are not supported and callers panic on them rather
+// than silently picking one arm.
+func nullableBranch(s avro.SchemaNode) (avro.SchemaNode, bool) {
+ if s.Type != "union" || len(s.Branches) < 2 {
+ return avro.SchemaNode{}, false
+ }
+ var nonNull *avro.SchemaNode
+ for i := range s.Branches {
+ b := s.Branches[i]
Review Comment:
```suggestion
for _, b := range s.Branches {
```
##########
arrow/avro/reader_types.go:
##########
@@ -574,126 +539,108 @@ func mapFieldBuilders(b array.Builder, field
arrow.Field, parent *fieldPos) {
}
case *array.Time32Builder:
f.appendFunc = func(data interface{}) error {
- appendTime32Data(bt, data)
- return nil
+ return appendTime32Data(bt, data)
}
case *array.Time64Builder:
f.appendFunc = func(data interface{}) error {
- appendTime64Data(bt, data)
- return nil
+ return appendTime64Data(bt, data)
}
case *array.TimestampBuilder:
f.appendFunc = func(data interface{}) error {
- appendTimestampData(bt, data)
- return nil
+ return appendTimestampData(bt, data)
}
}
}
-func appendBinaryData(b *array.BinaryBuilder, data interface{}) {
+// appendUUIDData accepts the two shapes a UUID may arrive as: a [16]byte
+// (fixed(16)+uuid) or a hex-dash string (string+uuid). Other byte lengths
+// are rejected rather than re-interpreted.
+func appendUUIDData(b *extensions.UUIDBuilder, data any, fieldName string)
error {
switch dt := data.(type) {
case nil:
b.AppendNull()
- case map[string]any:
- switch ct := dt["bytes"].(type) {
- case nil:
- b.AppendNull()
+ case string:
+ return b.AppendValueFromString(dt)
+ case [16]byte:
+ b.AppendBytes(dt)
+ case []byte:
+ switch len(dt) {
+ case 16:
+ b.AppendBytes([16]byte(dt))
+ case 36:
+ return b.AppendValueFromString(string(dt))
default:
- b.Append(ct.([]byte))
+ return fmt.Errorf("avro: %d-byte value cannot be a UUID
for column %q", len(dt), fieldName)
}
default:
- b.Append(fmt.Append([]byte{}, data))
+ return fmt.Errorf("avro: unsupported value of type %T for UUID
column %q", data, fieldName)
}
+ return nil
}
-func appendBinaryDictData(b *array.BinaryDictionaryBuilder, data interface{}) {
+func appendBinaryData(b *array.BinaryBuilder, data interface{}) error {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case []byte:
+ b.Append(dt)
+ default:
+ return fmt.Errorf("avro: unsupported value of type %T for
Binary column", data)
+ }
+ return nil
+}
+
+func appendBinaryDictData(b *array.BinaryDictionaryBuilder, data interface{})
error {
switch dt := data.(type) {
case nil:
b.AppendNull()
case string:
- b.AppendString(dt)
- case map[string]any:
- switch v := dt["string"].(type) {
- case nil:
- b.AppendNull()
- case string:
- b.AppendString(v)
+ if err := b.AppendString(dt); err != nil {
+ return fmt.Errorf("avro: enum symbol %q is not in the
dictionary (schema/data mismatch?): %w", dt, err)
}
+ default:
+ return fmt.Errorf("avro: unsupported value of type %T for
Dictionary column", data)
}
+ return nil
}
-func appendBoolData(b *array.BooleanBuilder, data interface{}) {
+func appendBoolData(b *array.BooleanBuilder, data interface{}) error {
switch dt := data.(type) {
case nil:
b.AppendNull()
case bool:
b.Append(dt)
- case map[string]any:
- switch v := dt["boolean"].(type) {
- case nil:
- b.AppendNull()
- case bool:
- b.Append(v)
- }
+ default:
+ return fmt.Errorf("avro: unsupported value of type %T for
Boolean column", data)
}
+ return nil
}
-func appendDate32Data(b *array.Date32Builder, data interface{}) {
+func appendDate32Data(b *array.Date32Builder, data interface{}) error {
switch dt := data.(type) {
case nil:
b.AppendNull()
- case int32:
- b.Append(arrow.Date32(dt))
- case map[string]any:
- switch v := dt["int"].(type) {
- case nil:
- b.AppendNull()
- case int32:
- b.Append(arrow.Date32(v))
- }
case time.Time:
b.Append(arrow.Date32FromTime(dt))
+ default:
+ return fmt.Errorf("avro: unsupported value of type %T for
Date32 column", data)
}
+ return nil
}
func appendDecimal128Data(b *array.Decimal128Builder, data interface{}, typ
arrow.DecimalType) error {
switch dt := data.(type) {
case nil:
b.AppendNull()
- case []byte:
- buf := bytes.NewBuffer(dt)
- if len(dt) <= 38 {
- var intData int64
- err := binary.Read(buf, binary.BigEndian, &intData)
- if err != nil {
- return err
- }
- b.Append(decimal128.FromI64(intData))
- } else {
- var bigIntData big.Int
-
b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
- }
- case map[string]any:
Review Comment:
If anyone was using custom-decoders that emitted `[]byte` or
`map[string]any` etc. they are now going to see "unsupported value of type X"
errors where old code would have accepted the input. Let's update the
description on the PR (and therefore the resulting changelog later) to note
this change in behavior for anyone who was using custom-decoders.
##########
arrow/avro/reader_types.go:
##########
@@ -574,126 +539,108 @@ func mapFieldBuilders(b array.Builder, field
arrow.Field, parent *fieldPos) {
}
case *array.Time32Builder:
f.appendFunc = func(data interface{}) error {
- appendTime32Data(bt, data)
- return nil
+ return appendTime32Data(bt, data)
}
case *array.Time64Builder:
f.appendFunc = func(data interface{}) error {
- appendTime64Data(bt, data)
- return nil
+ return appendTime64Data(bt, data)
}
case *array.TimestampBuilder:
f.appendFunc = func(data interface{}) error {
- appendTimestampData(bt, data)
- return nil
+ return appendTimestampData(bt, data)
}
}
}
-func appendBinaryData(b *array.BinaryBuilder, data interface{}) {
+// appendUUIDData accepts the two shapes a UUID may arrive as: a [16]byte
+// (fixed(16)+uuid) or a hex-dash string (string+uuid). Other byte lengths
+// are rejected rather than re-interpreted.
+func appendUUIDData(b *extensions.UUIDBuilder, data any, fieldName string)
error {
switch dt := data.(type) {
case nil:
b.AppendNull()
- case map[string]any:
- switch ct := dt["bytes"].(type) {
- case nil:
- b.AppendNull()
+ case string:
+ return b.AppendValueFromString(dt)
+ case [16]byte:
+ b.AppendBytes(dt)
+ case []byte:
+ switch len(dt) {
+ case 16:
+ b.AppendBytes([16]byte(dt))
+ case 36:
+ return b.AppendValueFromString(string(dt))
Review Comment:
let's add a comment here explaining why we have a case for 36 (i guess,
hex-dash UUID string?)
##########
arrow/avro/schema.go:
##########
@@ -56,33 +67,84 @@ func (node *schemaNode) schemaPath() string {
return path
}
-func (node *schemaNode) newChild(n string, s avro.Schema) *schemaNode {
+func (node *schemaNode) newChild(n string, s avro.SchemaNode) *schemaNode {
child := &schemaNode{
- name: n,
- parent: node,
- schema: s,
- schemaCache: node.schemaCache,
- index: int32(len(node.childrens)),
- depth: node.depth + 1,
+ name: n,
+ parent: node,
+ node: s,
+ namedCache: node.namedCache,
+ index: int32(len(node.childrens)),
}
node.childrens = append(node.childrens, child)
return child
}
func (node *schemaNode) children() []*schemaNode { return node.childrens }
-// func (node *schemaNode) nodeName() string { return node.name }
+// rememberNamed adds a record/enum/fixed SchemaNode to the named-type cache
+// under both its short name and (if a namespace is present) its full name,
+// so later references like {"type": "Address"} or {"type": "ns.Address"}
+// resolve back to the original definition.
+func (node *schemaNode) rememberNamed(s avro.SchemaNode) {
+ if s.Name == "" {
+ return
+ }
+ node.namedCache[s.Name] = s
+ if s.Namespace != "" {
+ node.namedCache[s.Namespace+"."+s.Name] = s
+ }
Review Comment:
It's worth noting that the hamba SchemaCache keyed on full name only, so for
schemas with two records named `Foo` in different namespaces a bare `{"type":
"Foo"}` will resolve to whichever was defined latest as opposed to being an
error (the previous behavior under hamba).
We should either restrict to full-name lookups or document the change in
behavior here that where a malformed schema may now silently produce something
where hamba would have errored.
##########
arrow/avro/schema.go:
##########
@@ -56,33 +67,84 @@ func (node *schemaNode) schemaPath() string {
return path
}
-func (node *schemaNode) newChild(n string, s avro.Schema) *schemaNode {
+func (node *schemaNode) newChild(n string, s avro.SchemaNode) *schemaNode {
child := &schemaNode{
- name: n,
- parent: node,
- schema: s,
- schemaCache: node.schemaCache,
- index: int32(len(node.childrens)),
- depth: node.depth + 1,
+ name: n,
+ parent: node,
+ node: s,
+ namedCache: node.namedCache,
+ index: int32(len(node.childrens)),
}
node.childrens = append(node.childrens, child)
return child
}
func (node *schemaNode) children() []*schemaNode { return node.childrens }
-// func (node *schemaNode) nodeName() string { return node.name }
+// rememberNamed adds a record/enum/fixed SchemaNode to the named-type cache
+// under both its short name and (if a namespace is present) its full name,
+// so later references like {"type": "Address"} or {"type": "ns.Address"}
+// resolve back to the original definition.
+func (node *schemaNode) rememberNamed(s avro.SchemaNode) {
+ if s.Name == "" {
+ return
+ }
+ node.namedCache[s.Name] = s
+ if s.Namespace != "" {
+ node.namedCache[s.Namespace+"."+s.Name] = s
+ }
+}
+
+// resolveRef replaces s with its inline definition if s.Type is a named-type
+// reference rather than a builtin Avro type. atField, when non-empty, names
+// the field this reference appears in and is included in the panic so the
+// user can locate the offending entry.
+func (node *schemaNode) resolveRef(s avro.SchemaNode, atField string)
avro.SchemaNode {
+ if _, ok := builtinAvroTypes[s.Type]; ok {
+ return s
+ }
+ if def, ok := node.namedCache[s.Type]; ok {
+ return def
+ }
+ loc := node.schemaPath()
+ if atField != "" {
+ loc += "." + atField
+ }
+ panic(fmt.Errorf("unknown named type %q referenced at %s", s.Type, loc))
+}
-// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema
-func ArrowSchemaFromAvro(schema avro.Schema) (s *arrow.Schema, err error) {
+// ArrowSchemaFromAvroJSON parses an Avro schema given as JSON text and returns
+// the equivalent Arrow schema.
+func ArrowSchemaFromAvroJSON(schemaJSON string) (*arrow.Schema, error) {
+ schema, err := avro.Parse(schemaJSON)
+ if err != nil {
+ return nil, err
+ }
+ return arrowSchemaFromAvroInternal(schema)
+}
+
+// ArrowSchemaFromAvro returns a new Arrow schema from a parsed Avro schema.
+//
+// Deprecated: Use [ArrowSchemaFromAvroJSON] instead — it does not couple
+// callers to a particular Avro library through its signature.
+func ArrowSchemaFromAvro(schema hambaAvro.Schema) (*arrow.Schema, error) {
+ js, err := json.Marshal(schema)
+ if err != nil {
+ return nil, fmt.Errorf("%w: could not serialize hamba avro
schema: %w", arrow.ErrInvalid, err)
+ }
+ return ArrowSchemaFromAvroJSON(string(js))
+}
+
+func arrowSchemaFromAvroInternal(schema *avro.Schema) (s *arrow.Schema, err
error) {
defer func() {
if r := recover(); r != nil {
s = nil
err = utils.FormatRecoveredError("invalid avro schema",
r)
}
}()
+ root := schema.Root()
n := newSchemaNode()
- n.schema = schema
- c := n.newChild(n.schema.(avro.NamedSchema).Name(), n.schema)
+ n.node = root
+ c := n.newChild(root.Name, root)
Review Comment:
OCF requires a named record at the top, we should probably add a guard or
explicit "must be named record" check here (the hamba code would have panic'd).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]