This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new ef130b95 fix(avro): append raw bytes (#850)
ef130b95 is described below

commit ef130b95e8b39282366feda26bd9bf4639b29115
Author: Matan Rosenberg <[email protected]>
AuthorDate: Wed Jun 10 22:56:51 2026 +0300

    fix(avro): append raw bytes (#850)
    
    ### Rationale for this change
    
    `appendBinaryData` has no case for `[]byte`, so a plain bytes value
    falls into the`fmt.Append` fallback, which results in a string-formated
    value.
    
    ```go
    schema := 
`{"type":"record","name":"r","fields":[{"name":"f","type":"bytes"}]}`
    var buf bytes.Buffer
    enc, _ := ocf.NewEncoder(schema, &buf)
    enc.Encode(map[string]any{"f": []byte{0x00, 0x01, 0xfe, 0xff}})
    enc.Close()
    
    r, _ := avro.NewOCFReader(&buf, avro.WithChunk(-1))
    r.Next()
    fmt.Printf("%x\n", r.RecordBatch().Column(0).(*array.Binary).Value(0))
    // got:  5b30203120323534203235355d   — the 13-byte text "[0 1 254 255]"
    // want: 0001feff
    ```
    
    ### What changes are included in this PR?
    
    - `appendBinaryData`: append `[]bytes` as is & append `string` as its
    raw bytes
    - `appendStringData`: handle `[]byte`.
    - `testdata.ByteArray.MarshalJSON`: Remove `fmt.Sprint(<bytes>)` to
    match the new behaviour
    ### Are these changes tested?
    
    Yes. Added regression test: `TestOCFReaderBytesValues` covering plain
    bytes and `["null","bytes"]`
    
    ### Are there any user-facing changes?
    
    Yes. Avro bytes columns previously decoded to corrupted text now decode
    to the actual payload.
---
 arrow/avro/reader_test.go       | 109 ++++++++++++++++++++++++++++++++++++
 arrow/avro/reader_types.go      | 120 +++++++++++++++++++++++++++++-----------
 arrow/avro/testdata/testdata.go |   4 +-
 3 files changed, 198 insertions(+), 35 deletions(-)

diff --git a/arrow/avro/reader_test.go b/arrow/avro/reader_test.go
index 2ba91846..5c57e2d6 100644
--- a/arrow/avro/reader_test.go
+++ b/arrow/avro/reader_test.go
@@ -25,8 +25,11 @@ import (
        "testing"
 
        "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/avro/testdata"
+       "github.com/apache/arrow-go/v18/arrow/memory"
        hamba "github.com/hamba/avro/v2"
+       "github.com/hamba/avro/v2/ocf"
        "github.com/stretchr/testify/assert"
 )
 
@@ -227,3 +230,109 @@ func TestReader(t *testing.T) {
                })
        }
 }
+
+// TestOCFReaderBytesValues exercises avro `bytes` fields, both plain and as a
+// ["null","bytes"] union: hamba hands the decoded value to the appenders as a
+// bare []byte, which previously fell into appendBinaryData's fmt fallback and
+// appended the formatted text (e.g. "[1 2 3]") instead of the payload.
+func TestOCFReaderBytesValues(t *testing.T) {
+       schema := `{
+               "type": "record",
+               "name": "rec",
+               "fields": [
+                       {"name": "plain", "type": "bytes"},
+                       {"name": "nullable", "type": ["null", "bytes"]}
+               ]
+       }`
+       payload := []byte{0x00, 0x01, 0xfe, 0xff}
+
+       var buf bytes.Buffer
+       enc, err := ocf.NewEncoder(schema, &buf)
+       assert.NoError(t, err)
+       assert.NoError(t, enc.Encode(map[string]any{
+               "plain":    payload,
+               "nullable": map[string]any{"bytes": payload},
+       }))
+       assert.NoError(t, enc.Encode(map[string]any{
+               "plain":    []byte{},
+               "nullable": nil,
+       }))
+       assert.NoError(t, enc.Close())
+
+       ar, err := NewOCFReader(bytes.NewReader(buf.Bytes()), WithChunk(-1))
+       assert.NoError(t, err)
+       defer ar.Close()
+
+       assert.True(t, ar.Next())
+       assert.NoError(t, ar.Err())
+       rec := ar.RecordBatch()
+
+       plain := rec.Column(0).(*array.Binary)
+       assert.Equal(t, payload, plain.Value(0))
+       assert.Equal(t, []byte{}, plain.Value(1))
+
+       nullable := rec.Column(1).(*array.Binary)
+       assert.Equal(t, payload, nullable.Value(0))
+       assert.True(t, nullable.IsNull(1))
+}
+
+// Types outside what the hamba decoder produces must error rather than append
+// a fmt-formatted rendering of the value.
+func TestAppendBinaryAndStringDataUnexpectedTypes(t *testing.T) {
+       bb := array.NewBinaryBuilder(memory.DefaultAllocator, 
arrow.BinaryTypes.Binary)
+       defer bb.Release()
+
+       assert.NoError(t, appendBinaryData(bb, []byte{0x01}))
+       assert.NoError(t, appendBinaryData(bb, nil))
+       assert.NoError(t, appendBinaryData(bb, map[string]any{"bytes": 
[]byte{0x02}}))
+       assert.ErrorContains(t, appendBinaryData(bb, 42), "unexpected type int")
+       assert.ErrorContains(t, appendBinaryData(bb, map[string]any{"bytes": 
"text"}), "unexpected type string")
+       assert.Equal(t, 3, bb.Len())
+
+       sb := array.NewStringBuilder(memory.DefaultAllocator)
+       defer sb.Release()
+
+       assert.NoError(t, appendStringData(sb, "ok"))
+       assert.NoError(t, appendStringData(sb, []byte("ok")))
+       assert.NoError(t, appendStringData(sb, nil))
+       assert.NoError(t, appendStringData(sb, map[string]any{"string": "ok"}))
+       assert.ErrorContains(t, appendStringData(sb, 42), "unexpected type int")
+       assert.ErrorContains(t, appendStringData(sb, map[string]any{"string": 
42}), "unexpected type int")
+       assert.Equal(t, 4, sb.Len())
+}
+
+// loadDatum must surface appender errors from nested paths (map values,
+// list items), not only from top-level and struct fields.
+func TestLoadDatumPropagatesNestedAppendErrors(t *testing.T) {
+       newLoader := func(t *testing.T, avroSchema string) (*dataLoader, 
*array.RecordBuilder) {
+               t.Helper()
+               schema, err := hamba.Parse(avroSchema)
+               assert.NoError(t, err)
+               arrowSchema, err := ArrowSchemaFromAvro(schema)
+               assert.NoError(t, err)
+               bld := array.NewRecordBuilder(memory.DefaultAllocator, 
arrowSchema)
+               pos := newFieldPos()
+               ldr := newDataLoader()
+               for idx, fb := range bld.Fields() {
+                       mapFieldBuilders(fb, arrowSchema.Field(idx), pos)
+               }
+               ldr.drawTree(pos)
+               return ldr, bld
+       }
+
+       t.Run("map value", func(t *testing.T) {
+               ldr, bld := newLoader(t, `{"type":"record","name":"r","fields":[
+                       {"name":"m","type":{"type":"map","values":"bytes"}}]}`)
+               defer bld.Release()
+               assert.NoError(t, ldr.loadDatum(map[string]any{"m": 
map[string]any{"k": []byte{0x01}}}))
+               assert.ErrorContains(t, ldr.loadDatum(map[string]any{"m": 
map[string]any{"k": 42}}), "unexpected type int")
+       })
+
+       t.Run("list item", func(t *testing.T) {
+               ldr, bld := newLoader(t, `{"type":"record","name":"r","fields":[
+                       {"name":"l","type":{"type":"array","items":"bytes"}}]}`)
+               defer bld.Release()
+               assert.NoError(t, ldr.loadDatum(map[string]any{"l": 
[]any{[]byte{0x01}}}))
+               assert.ErrorContains(t, ldr.loadDatum(map[string]any{"l": 
[]any{42}}), "unexpected type int")
+       })
+}
diff --git a/arrow/avro/reader_types.go b/arrow/avro/reader_types.go
index aabad17e..45a7b145 100644
--- a/arrow/avro/reader_types.go
+++ b/arrow/avro/reader_types.go
@@ -92,10 +92,21 @@ func (d *dataLoader) drawTree(field *fieldPos) {
 // Since array.StructBuilder.AppendNull() will recursively append null to all 
of the
 // struct's fields, in the case of nil being passed to a struct's builderFunc 
it will
 // return a ErrNullStructData error to signal that all its sub-fields can be 
skipped.
+// filterNullStruct drops ErrNullStructData, which signals a null struct
+// whose sub-fields can be skipped rather than a failure.
+func filterNullStruct(err error) error {
+       if err == ErrNullStructData {
+               return nil
+       }
+       return err
+}
+
 func (d *dataLoader) loadDatum(data any) error {
        if d.list == nil && d.mapField == nil {
                if d.mapValue != nil {
-                       d.mapValue.appendFunc(data)
+                       if err := 
filterNullStruct(d.mapValue.appendFunc(data)); err != nil {
+                               return err
+                       }
                }
                var NullParent *fieldPos
                for _, f := range d.fields {
@@ -136,7 +147,9 @@ func (d *dataLoader) loadDatum(data any) error {
                                                }
                                        } else {
                                                for _, e := range dt {
-                                                       
d.children[0].loadDatum(e)
+                                                       if err := 
d.children[0].loadDatum(e); err != nil {
+                                                               return err
+                                                       }
                                                }
                                        }
                                case map[string]any:
@@ -154,16 +167,24 @@ func (d *dataLoader) loadDatum(data any) error {
                }
                for _, c := range d.children {
                        if c.list != nil {
-                               c.loadDatum(c.list.getValue(data))
+                               if err := c.loadDatum(c.list.getValue(data)); 
err != nil {
+                                       return err
+                               }
                        }
                        if c.mapField != nil {
                                switch dt := data.(type) {
                                case nil:
-                                       c.loadDatum(dt)
+                                       if err := c.loadDatum(dt); err != nil {
+                                               return err
+                                       }
                                case map[string]any:
-                                       c.loadDatum(c.mapField.getValue(dt))
+                                       if err := 
c.loadDatum(c.mapField.getValue(dt)); err != nil {
+                                               return err
+                                       }
                                default:
-                                       c.loadDatum(c.mapField.getValue(data))
+                                       if err := 
c.loadDatum(c.mapField.getValue(data)); err != nil {
+                                               return err
+                                       }
                                }
                        }
                }
@@ -171,12 +192,18 @@ func (d *dataLoader) loadDatum(data any) error {
                if d.list != nil {
                        switch dt := data.(type) {
                        case nil:
-                               d.list.appendFunc(dt)
+                               if err := 
filterNullStruct(d.list.appendFunc(dt)); err != nil {
+                                       return err
+                               }
                        case []any:
-                               d.list.appendFunc(dt)
+                               if err := 
filterNullStruct(d.list.appendFunc(dt)); err != nil {
+                                       return err
+                               }
                                for _, e := range dt {
                                        if d.item != nil {
-                                               d.item.appendFunc(e)
+                                               if err := 
filterNullStruct(d.item.appendFunc(e)); err != nil {
+                                                       return err
+                                               }
                                        }
                                        var NullParent *fieldPos
                                        for _, f := range d.fields {
@@ -194,18 +221,26 @@ func (d *dataLoader) loadDatum(data any) error {
                                        }
                                        for _, c := range d.children {
                                                if c.list != nil {
-                                                       
c.loadDatum(c.list.getValue(e))
+                                                       if err := 
c.loadDatum(c.list.getValue(e)); err != nil {
+                                                               return err
+                                                       }
                                                }
                                                if c.mapField != nil {
-                                                       
c.loadDatum(c.mapField.getValue(e))
+                                                       if err := 
c.loadDatum(c.mapField.getValue(e)); err != nil {
+                                                               return err
+                                                       }
                                                }
                                        }
                                }
                        case map[string]any:
-                               d.list.appendFunc(dt["array"])
+                               if err := 
filterNullStruct(d.list.appendFunc(dt["array"])); err != nil {
+                                       return err
+                               }
                                for _, e := range dt["array"].([]any) {
                                        if d.item != nil {
-                                               d.item.appendFunc(e)
+                                               if err := 
filterNullStruct(d.item.appendFunc(e)); err != nil {
+                                                       return err
+                                               }
                                        }
                                        var NullParent *fieldPos
                                        for _, f := range d.fields {
@@ -222,27 +257,40 @@ func (d *dataLoader) loadDatum(data any) error {
                                                }
                                        }
                                        for _, c := range d.children {
-                                               c.loadDatum(c.list.getValue(e))
+                                               if err := 
c.loadDatum(c.list.getValue(e)); err != nil {
+                                                       return err
+                                               }
                                        }
                                }
                        default:
-                               d.list.appendFunc(data)
-                               d.item.appendFunc(dt)
+                               if err := 
filterNullStruct(d.list.appendFunc(data)); err != nil {
+                                       return err
+                               }
+                               if err := 
filterNullStruct(d.item.appendFunc(dt)); err != nil {
+                                       return err
+                               }
                        }
                }
                if d.mapField != nil {
                        switch dt := data.(type) {
                        case nil:
-                               d.mapField.appendFunc(dt)
+                               if err := 
filterNullStruct(d.mapField.appendFunc(dt)); err != nil {
+                                       return err
+                               }
                        case map[string]any:
-
-                               d.mapField.appendFunc(dt)
+                               if err := 
filterNullStruct(d.mapField.appendFunc(dt)); err != nil {
+                                       return err
+                               }
                                for k, v := range dt {
-                                       d.mapKey.appendFunc(k)
+                                       if err := 
filterNullStruct(d.mapKey.appendFunc(k)); err != nil {
+                                               return err
+                                       }
                                        if d.mapValue != nil {
-                                               d.mapValue.appendFunc(v)
-                                       } else {
-                                               d.children[0].loadDatum(v)
+                                               if err := 
filterNullStruct(d.mapValue.appendFunc(v)); err != nil {
+                                                       return err
+                                               }
+                                       } else if err := 
d.children[0].loadDatum(v); err != nil {
+                                               return err
                                        }
                                }
                        }
@@ -397,8 +445,7 @@ func mapFieldBuilders(b array.Builder, field arrow.Field, 
parent *fieldPos) {
        switch bt := b.(type) {
        case *array.BinaryBuilder:
                f.appendFunc = func(data interface{}) error {
-                       appendBinaryData(bt, data)
-                       return nil
+                       return appendBinaryData(bt, data)
                }
        case *array.BinaryDictionaryBuilder:
                // has metadata for Avro enum symbols
@@ -551,8 +598,7 @@ func mapFieldBuilders(b array.Builder, field arrow.Field, 
parent *fieldPos) {
                }
        case *array.StringBuilder:
                f.appendFunc = func(data interface{}) error {
-                       appendStringData(bt, data)
-                       return nil
+                       return appendStringData(bt, data)
                }
        case *array.StructBuilder:
                // has metadata for Avro Union named types
@@ -590,20 +636,25 @@ func mapFieldBuilders(b array.Builder, field arrow.Field, 
parent *fieldPos) {
        }
 }
 
-func appendBinaryData(b *array.BinaryBuilder, data interface{}) {
+func appendBinaryData(b *array.BinaryBuilder, data interface{}) error {
        switch dt := data.(type) {
        case nil:
                b.AppendNull()
+       case []byte:
+               b.Append(dt)
        case map[string]any:
                switch ct := dt["bytes"].(type) {
                case nil:
                        b.AppendNull()
+               case []byte:
+                       b.Append(ct)
                default:
-                       b.Append(ct.([]byte))
+                       return fmt.Errorf("unexpected type %T for avro bytes 
union value", ct)
                }
        default:
-               b.Append(fmt.Append([]byte{}, data))
+               return fmt.Errorf("unexpected type %T for avro bytes value", 
data)
        }
+       return nil
 }
 
 func appendBinaryDictData(b *array.BinaryDictionaryBuilder, data interface{}) {
@@ -853,22 +904,27 @@ func appendInt64Data(b *array.Int64Builder, data 
interface{}) {
        }
 }
 
-func appendStringData(b *array.StringBuilder, data interface{}) {
+func appendStringData(b *array.StringBuilder, data interface{}) error {
        switch dt := data.(type) {
        case nil:
                b.AppendNull()
        case string:
                b.Append(dt)
+       case []byte:
+               b.Append(string(dt))
        case map[string]any:
                switch v := dt["string"].(type) {
                case nil:
                        b.AppendNull()
                case string:
                        b.Append(v)
+               default:
+                       return fmt.Errorf("unexpected type %T for avro string 
union value", v)
                }
        default:
-               b.Append(fmt.Sprint(data))
+               return fmt.Errorf("unexpected type %T for avro string value", 
data)
        }
+       return nil
 }
 
 func appendTime32Data(b *array.Time32Builder, data interface{}) {
diff --git a/arrow/avro/testdata/testdata.go b/arrow/avro/testdata/testdata.go
index a5090b40..4c8bac71 100644
--- a/arrow/avro/testdata/testdata.go
+++ b/arrow/avro/testdata/testdata.go
@@ -42,9 +42,7 @@ const (
 type ByteArray []byte
 
 func (b ByteArray) MarshalJSON() ([]byte, error) {
-       s := fmt.Sprint(b)
-       encoded := base64.StdEncoding.EncodeToString([]byte(s))
-       return json.Marshal(encoded)
+       return json.Marshal(base64.StdEncoding.EncodeToString(b))
 }
 
 type TimestampMicros int64

Reply via email to