This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push: new b196d3b fix(arrow/avro-reader): bunch of types that didn't work (#416) b196d3b is described below commit b196d3b316d09f63786f021d4f1baa1fdd7620d2 Author: Willem Jan <51120533+willem-j...@users.noreply.github.com> AuthorDate: Tue Jul 8 20:00:32 2025 +0200 fix(arrow/avro-reader): bunch of types that didn't work (#416) ### Rationale for this change #415 ### What changes are included in this PR? Fix decoding of avro types that no longer worked: fixed bytes, timestamp, time, date & decimals ### Are these changes tested? Added tests to the avro/reader_test.go ### Are there any user-facing changes? No API changes, just broader compatability. --------- Co-authored-by: Willem Jan Noort <willemjan.no...@essent.nl> --- arrow/avro/reader_test.go | 253 +++++++-------------------------- arrow/avro/reader_types.go | 63 ++++++++- arrow/avro/schema_test.go | 203 +-------------------------- arrow/avro/testdata/alltypes.avsc | 205 +++++++++++++++++++++++++++ arrow/avro/testdata/testdata.go | 287 ++++++++++++++++++++++++++++++++++++++ dev/release/rat_exclude_files.txt | 1 + 6 files changed, 612 insertions(+), 400 deletions(-) diff --git a/arrow/avro/reader_test.go b/arrow/avro/reader_test.go index 1b07fbb..d87948a 100644 --- a/arrow/avro/reader_test.go +++ b/arrow/avro/reader_test.go @@ -17,212 +17,24 @@ package avro import ( + "bytes" + "encoding/json" "fmt" + "os" + "path/filepath" "testing" "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/avro/testdata" hamba "github.com/hamba/avro/v2" + "github.com/stretchr/testify/assert" ) -func TestEditSchemaStringEqual(t *testing.T) { +func TestReader(t *testing.T) { tests := []struct { - avroSchema string arrowSchema []arrow.Field }{ { - avroSchema: `{ - "fields": [ - { - "name": "inheritNull", - "type": { - "name": "Simple", - "symbols": [ - "a", - "b" - ], - "type": "enum" - } - }, - { - "name": "explicitNamespace", - "type": { - "name": "test", - "namespace": "org.hamba.avro", - "size": 12, - "type": "fixed" - } - }, - { - "name": "fullName", - "type": { - "type": "record", - "name": "fullName_data", - "namespace": "ignored", - "doc": "A name attribute with a fullname, so the namespace attribute is ignored. The fullname is 'a.full.Name', and the namespace is 'a.full'.", - "fields": [{ - "name": "inheritNamespace", - "type": { - "type": "enum", - "name": "Understanding", - "doc": "A simple name (attribute) and no namespace attribute: inherit the namespace of the enclosing type 'a.full.Name'. The fullname is 'a.full.Understanding'.", - "symbols": ["d", "e"] - } - }, { - "name": "md5", - "type": { - "name": "md5_data", - "type": "fixed", - "size": 16, - "namespace": "ignored" - } - } - ] - } - }, - { - "name": "id", - "type": "int" - }, - { - "name": "bigId", - "type": "long" - }, - { - "name": "temperature", - "type": [ - "null", - "float" - ] - }, - { - "name": "fraction", - "type": [ - "null", - "double" - ] - }, - { - "name": "is_emergency", - "type": "boolean" - }, - { - "name": "remote_ip", - "type": [ - "null", - "bytes" - ] - }, - { - "name": "person", - "type": { - "fields": [ - { - "name": "lastname", - "type": "string" - }, - { - "name": "address", - "type": { - "fields": [ - { - "name": "streetaddress", - "type": "string" - }, - { - "name": "city", - "type": "string" - } - ], - "name": "AddressUSRecord", - "type": "record" - } - }, - { - "name": "mapfield", - "type": { - "default": { - }, - "type": "map", - "values": "long" - } - }, - { - "name": "arrayField", - "type": { - "default": [ - ], - "items": "string", - "type": "array" - } - } - ], - "name": "person_data", - "type": "record" - } - }, - { - "name": "decimalField", - "type": { - "logicalType": "decimal", - "precision": 4, - "scale": 2, - "type": "bytes" - } - }, - { - "logicalType": "uuid", - "name": "uuidField", - "type": "string" - }, - { - "name": "timemillis", - "type": { - "type": "int", - "logicalType": "time-millis" - } - }, - { - "name": "timemicros", - "type": { - "type": "long", - "logicalType": "time-micros" - } - }, - { - "name": "timestampmillis", - "type": { - "type": "long", - "logicalType": "timestamp-millis" - } - }, - { - "name": "timestampmicros", - "type": { - "type": "long", - "logicalType": "timestamp-micros" - } - }, - { - "name": "duration", - "type": { - "name": "duration", - "namespace": "whyowhy", - "logicalType": "duration", - "size": 12, - "type": "fixed" - } - }, - { - "name": "date", - "type": { - "logicalType": "date", - "type": "int" - } - } - ], - "name": "Example", - "type": "record" - }`, arrowSchema: []arrow.Field{ { Name: "explicitNamespace", @@ -303,6 +115,10 @@ func TestEditSchemaStringEqual(t *testing.T) { Name: "decimalField", Type: &arrow.Decimal128Type{Precision: 4, Scale: 2}, }, + { + Name: "decimal256Field", + Type: &arrow.Decimal256Type{Precision: 60, Scale: 2}, + }, { Name: "uuidField", Type: arrow.BinaryTypes.String, @@ -336,12 +152,15 @@ func TestEditSchemaStringEqual(t *testing.T) { } for _, test := range tests { - t.Run("", func(t *testing.T) { + tp := testdata.Generate() + defer os.RemoveAll(filepath.Dir(tp.Avro)) + + t.Run("ShouldParseSchemaWithEdits", func(t *testing.T) { want := arrow.NewSchema(test.arrowSchema, nil) - schema, err := hamba.ParseBytes([]byte(test.avroSchema)) + schema, err := testdata.AllTypesAvroSchema() if err != nil { - t.Fatalf("%v", err) + t.Fatal(err) } r := new(OCFReader) r.avroSchema = schema.String() @@ -354,11 +173,45 @@ func TestEditSchemaStringEqual(t *testing.T) { if err != nil { t.Fatalf("%v", err) } + assert.Equal(t, want.String(), got.String()) if fmt.Sprintf("%+v", want.String()) != fmt.Sprintf("%+v", got.String()) { t.Fatalf("got=%v,\n want=%v", got.String(), want.String()) - } else { - t.Logf("schema.String() comparison passed") } }) + + t.Run("ShouldLoadExpectedRecords", func(t *testing.T) { + b, err := os.ReadFile(tp.Avro) + if err != nil { + t.Error(err) + } + r := bytes.NewReader(b) + + opts := []Option{WithChunk(-1)} + ar, err := NewOCFReader(r, opts...) + if err != nil { + t.Error(err) + } + defer ar.Close() + + exists := ar.Next() + + if ar.Err() != nil { + t.Error("failed to read next record: %w", ar.Err()) + } + if !exists { + t.Error("no record exists") + } + a, err := ar.Record().MarshalJSON() + assert.NoError(t, err) + var avroParsed []map[string]any + json.Unmarshal(a, &avroParsed) + + j, err := os.ReadFile(tp.Json) + assert.NoError(t, err) + var jsonParsed map[string]any + json.Unmarshal(j, &jsonParsed) + + assert.Equal(t, jsonParsed, avroParsed[0]) + }) } } diff --git a/arrow/avro/reader_types.go b/arrow/avro/reader_types.go index 50f0b18..ff21b5a 100644 --- a/arrow/avro/reader_types.go +++ b/arrow/avro/reader_types.go @@ -22,6 +22,8 @@ import ( "errors" "fmt" "math/big" + "reflect" + "time" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" @@ -29,6 +31,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/decimal256" "github.com/apache/arrow-go/v18/arrow/extensions" "github.com/apache/arrow-go/v18/arrow/memory" + hamba "github.com/hamba/avro/v2" ) type dataLoader struct { @@ -422,7 +425,11 @@ func mapFieldBuilders(b array.Builder, field arrow.Field, parent *fieldPos) { } case *array.Decimal128Builder: f.appendFunc = func(data interface{}) error { - err := appendDecimal128Data(bt, data) + typ, ok := field.Type.(arrow.DecimalType) + if !ok { + return nil + } + err := appendDecimal128Data(bt, data, typ) if err != nil { return err } @@ -430,7 +437,11 @@ func mapFieldBuilders(b array.Builder, field arrow.Field, parent *fieldPos) { } case *array.Decimal256Builder: f.appendFunc = func(data interface{}) error { - err := appendDecimal256Data(bt, data) + typ, ok := field.Type.(arrow.DecimalType) + if !ok { + return nil + } + err := appendDecimal256Data(bt, data, typ) if err != nil { return err } @@ -640,10 +651,12 @@ func appendDate32Data(b *array.Date32Builder, data interface{}) { case int32: b.Append(arrow.Date32(v)) } + case time.Time: + b.Append(arrow.Date32FromTime(dt)) } } -func appendDecimal128Data(b *array.Decimal128Builder, data interface{}) error { +func appendDecimal128Data(b *array.Decimal128Builder, data interface{}, typ arrow.DecimalType) error { switch dt := data.(type) { case nil: b.AppendNull() @@ -673,11 +686,19 @@ func appendDecimal128Data(b *array.Decimal128Builder, data interface{}) error { var bigIntData big.Int b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes()))) } + case *big.Rat: + v := bigRatToBigInt(dt, typ) + + if v.IsInt64() { + b.Append(decimal128.FromI64(v.Int64())) + } else { + b.Append(decimal128.FromBigInt(v)) + } } return nil } -func appendDecimal256Data(b *array.Decimal256Builder, data interface{}) error { +func appendDecimal256Data(b *array.Decimal256Builder, data interface{}, typ arrow.DecimalType) error { switch dt := data.(type) { case nil: b.AppendNull() @@ -689,10 +710,21 @@ func appendDecimal256Data(b *array.Decimal256Builder, data interface{}) error { var bigIntData big.Int buf := bytes.NewBuffer(dt["bytes"].([]byte)) b.Append(decimal256.FromBigInt(bigIntData.SetBytes(buf.Bytes()))) + case *big.Rat: + b.Append(decimal256.FromBigInt(bigRatToBigInt(dt, typ))) } return nil } +func bigRatToBigInt(dt *big.Rat, typ arrow.DecimalType) *big.Int { + scale := big.NewInt(int64(typ.GetScale())) + scaledNum := new(big.Int).Set(dt.Num()) + scaleFactor := new(big.Int).Exp(big.NewInt(10), scale, nil) + scaledNum.Mul(scaledNum, scaleFactor) + scaledNum.Quo(scaledNum, dt.Denom()) + return scaledNum +} + // Avro duration logical type annotates Avro fixed type of size 12, which stores three little-endian // unsigned integers that represent durations at different granularities of time. The first stores // a number in months, the second stores a number in days, and the third stores a number in milliseconds. @@ -717,6 +749,12 @@ func appendDurationData(b *array.MonthDayNanoIntervalBuilder, data interface{}) dur.Nanoseconds = int64(binary.LittleEndian.Uint32(dtb[8:]) * 1000000) b.Append(*dur) } + case hamba.LogicalDuration: + b.Append(arrow.MonthDayNanoInterval{ + Months: int32(dt.Months), + Days: int32(dt.Days), + Nanoseconds: int64(dt.Milliseconds) * int64(time.Millisecond), + }) } } @@ -733,6 +771,13 @@ func appendFixedSizeBinaryData(b *array.FixedSizeBinaryBuilder, data interface{} case []byte: b.Append(v) } + default: + v := reflect.ValueOf(data) + if v.Kind() == reflect.Array && v.Type().Elem().Kind() == reflect.Uint8 { + bytes := make([]byte, v.Len()) + reflect.Copy(reflect.ValueOf(bytes), v) + b.Append(bytes) + } } } @@ -839,6 +884,8 @@ func appendTime32Data(b *array.Time32Builder, data interface{}) { case int32: b.Append(arrow.Time32(v)) } + case time.Duration: + b.Append(arrow.Time32(dt.Milliseconds())) } } @@ -855,6 +902,8 @@ func appendTime64Data(b *array.Time64Builder, data interface{}) { case int64: b.Append(arrow.Time64(v)) } + case time.Duration: + b.Append(arrow.Time64(dt.Microseconds())) } } @@ -871,5 +920,11 @@ func appendTimestampData(b *array.TimestampBuilder, data interface{}) { case int64: b.Append(arrow.Timestamp(v)) } + case time.Time: + v, err := arrow.TimestampFromTime(dt, b.Type().(*arrow.TimestampType).Unit) + if err != nil { + panic(err) + } + b.Append(v) } } diff --git a/arrow/avro/schema_test.go b/arrow/avro/schema_test.go index 385bd4b..d813da0 100644 --- a/arrow/avro/schema_test.go +++ b/arrow/avro/schema_test.go @@ -21,208 +21,14 @@ import ( "testing" "github.com/apache/arrow-go/v18/arrow" - hamba "github.com/hamba/avro/v2" + "github.com/apache/arrow-go/v18/arrow/avro/testdata" ) func TestSchemaStringEqual(t *testing.T) { tests := []struct { - avroSchema string arrowSchema []arrow.Field }{ { - avroSchema: `{ - "fields": [ - { - "name": "inheritNull", - "type": { - "name": "Simple", - "symbols": [ - "a", - "b" - ], - "type": "enum" - } - }, - { - "name": "explicitNamespace", - "type": { - "name": "test", - "namespace": "org.hamba.avro", - "size": 12, - "type": "fixed" - } - }, - { - "name": "fullName", - "type": { - "type": "record", - "name": "fullName_data", - "namespace": "ignored", - "doc": "A name attribute with a fullname, so the namespace attribute is ignored. The fullname is 'a.full.Name', and the namespace is 'a.full'.", - "fields": [{ - "name": "inheritNamespace", - "type": { - "type": "enum", - "name": "Understanding", - "doc": "A simple name (attribute) and no namespace attribute: inherit the namespace of the enclosing type 'a.full.Name'. The fullname is 'a.full.Understanding'.", - "symbols": ["d", "e"] - } - }, { - "name": "md5", - "type": { - "name": "md5_data", - "type": "fixed", - "size": 16, - "namespace": "ignored" - } - } - ] - } - }, - { - "name": "id", - "type": "int" - }, - { - "name": "bigId", - "type": "long" - }, - { - "name": "temperature", - "type": [ - "null", - "float" - ] - }, - { - "name": "fraction", - "type": [ - "null", - "double" - ] - }, - { - "name": "is_emergency", - "type": "boolean" - }, - { - "name": "remote_ip", - "type": [ - "null", - "bytes" - ] - }, - { - "name": "person", - "type": { - "fields": [ - { - "name": "lastname", - "type": "string" - }, - { - "name": "address", - "type": { - "fields": [ - { - "name": "streetaddress", - "type": "string" - }, - { - "name": "city", - "type": "string" - } - ], - "name": "AddressUSRecord", - "type": "record" - } - }, - { - "name": "mapfield", - "type": { - "default": { - }, - "type": "map", - "values": "long" - } - }, - { - "name": "arrayField", - "type": { - "default": [ - ], - "items": "string", - "type": "array" - } - } - ], - "name": "person_data", - "type": "record" - } - }, - { - "name": "decimalField", - "type": { - "logicalType": "decimal", - "precision": 4, - "scale": 2, - "type": "bytes" - } - }, - { - "logicalType": "uuid", - "name": "uuidField", - "type": "string" - }, - { - "name": "timemillis", - "type": { - "type": "int", - "logicalType": "time-millis" - } - }, - { - "name": "timemicros", - "type": { - "type": "long", - "logicalType": "time-micros" - } - }, - { - "name": "timestampmillis", - "type": { - "type": "long", - "logicalType": "timestamp-millis" - } - }, - { - "name": "timestampmicros", - "type": { - "type": "long", - "logicalType": "timestamp-micros" - } - }, - { - "name": "duration", - "type": { - "name": "duration", - "namespace": "whyowhy", - "logicalType": "duration", - "size": 12, - "type": "fixed" - } - }, - { - "name": "date", - "type": { - "logicalType": "date", - "type": "int" - } - } - ], - "name": "Example", - "type": "record" - }`, arrowSchema: []arrow.Field{ { Name: "inheritNull", @@ -309,6 +115,10 @@ func TestSchemaStringEqual(t *testing.T) { Name: "decimalField", Type: &arrow.Decimal128Type{Precision: 4, Scale: 2}, }, + { + Name: "decimal256Field", + Type: &arrow.Decimal256Type{Precision: 60, Scale: 2}, + }, { Name: "uuidField", Type: arrow.BinaryTypes.String, @@ -344,7 +154,8 @@ func TestSchemaStringEqual(t *testing.T) { for _, test := range tests { t.Run("", func(t *testing.T) { want := arrow.NewSchema(test.arrowSchema, nil) - schema, err := hamba.ParseBytes([]byte(test.avroSchema)) + + schema, err := testdata.AllTypesAvroSchema() if err != nil { t.Fatalf("%v", err) } diff --git a/arrow/avro/testdata/alltypes.avsc b/arrow/avro/testdata/alltypes.avsc new file mode 100644 index 0000000..a4e3037 --- /dev/null +++ b/arrow/avro/testdata/alltypes.avsc @@ -0,0 +1,205 @@ +{ + "fields": [ + { + "name": "inheritNull", + "type": { + "name": "Simple", + "symbols": [ + "a", + "b" + ], + "type": "enum" + } + }, + { + "name": "explicitNamespace", + "type": { + "name": "test", + "namespace": "org.hamba.avro", + "size": 12, + "type": "fixed" + } + }, + { + "name": "fullName", + "type": { + "type": "record", + "name": "fullName_data", + "namespace": "ignored", + "doc": "A name attribute with a fullname, so the namespace attribute is ignored. The fullname is 'a.full.Name', and the namespace is 'a.full'.", + "fields": [ + { + "name": "inheritNamespace", + "type": { + "type": "enum", + "name": "Understanding", + "doc": "A simple name (attribute) and no namespace attribute: inherit the namespace of the enclosing type 'a.full.Name'. The fullname is 'a.full.Understanding'.", + "symbols": [ + "d", + "e" + ] + } + }, + { + "name": "md5", + "type": { + "name": "md5_data", + "type": "fixed", + "size": 16, + "namespace": "ignored" + } + } + ] + } + }, + { + "name": "id", + "type": "int" + }, + { + "name": "bigId", + "type": "long" + }, + { + "name": "temperature", + "type": [ + "null", + "float" + ] + }, + { + "name": "fraction", + "type": [ + "null", + "double" + ] + }, + { + "name": "is_emergency", + "type": "boolean" + }, + { + "name": "remote_ip", + "type": [ + "null", + "bytes" + ] + }, + { + "name": "person", + "type": { + "fields": [ + { + "name": "lastname", + "type": "string" + }, + { + "name": "address", + "type": { + "fields": [ + { + "name": "streetaddress", + "type": "string" + }, + { + "name": "city", + "type": "string" + } + ], + "name": "AddressUSRecord", + "type": "record" + } + }, + { + "name": "mapfield", + "type": { + "default": {}, + "type": "map", + "values": "long" + } + }, + { + "name": "arrayField", + "type": { + "default": [], + "items": "string", + "type": "array" + } + } + ], + "name": "person_data", + "type": "record" + } + }, + { + "name": "decimalField", + "type": { + "logicalType": "decimal", + "precision": 4, + "scale": 2, + "type": "bytes" + } + }, + { + "name": "decimal256Field", + "type": { + "logicalType": "decimal", + "precision": 60, + "scale": 2, + "type": "bytes" + } + }, + { + "logicalType": "uuid", + "name": "uuidField", + "type": "string" + }, + { + "name": "timemillis", + "type": { + "type": "int", + "logicalType": "time-millis" + } + }, + { + "name": "timemicros", + "type": { + "type": "long", + "logicalType": "time-micros" + } + }, + { + "name": "timestampmillis", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "timestampmicros", + "type": { + "type": "long", + "logicalType": "timestamp-micros" + } + }, + { + "name": "duration", + "type": { + "name": "duration", + "namespace": "whyowhy", + "logicalType": "duration", + "size": 12, + "type": "fixed" + } + }, + { + "name": "date", + "type": { + "logicalType": "date", + "type": "int" + } + } + ], + "name": "Example", + "type": "record" +} diff --git a/arrow/avro/testdata/testdata.go b/arrow/avro/testdata/testdata.go new file mode 100644 index 0000000..926c054 --- /dev/null +++ b/arrow/avro/testdata/testdata.go @@ -0,0 +1,287 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testdata + +import ( + "encoding/base64" + "encoding/binary" + "encoding/json" + "fmt" + "log" + "math/big" + "os" + "path/filepath" + "strings" + "time" + + avro "github.com/hamba/avro/v2" + "github.com/hamba/avro/v2/ocf" +) + +const ( + SchemaFileName = "alltypes.avsc" + sampleAvroFileName = "alltypes.avro" + sampleJSONFileName = "alltypes.json" + decimalTypeScale = 2 +) + +type ByteArray []byte + +func (b ByteArray) MarshalJSON() ([]byte, error) { + s := fmt.Sprint(b) + encoded := base64.StdEncoding.EncodeToString([]byte(s)) + return json.Marshal(encoded) +} + +type TimestampMicros int64 + +func (t TimestampMicros) MarshalJSON() ([]byte, error) { + ts := time.Unix(0, int64(t)*int64(time.Microsecond)).UTC().Format("2006-01-02 15:04:05.000000") + // arrow record marshaller trims trailing zero digits from timestamp so we do the same + return json.Marshal(fmt.Sprintf("%sZ", strings.TrimRight(ts, "0."))) +} + +type TimestampMillis int64 + +func (t TimestampMillis) MarshalJSON() ([]byte, error) { + ts := time.Unix(0, int64(t)*int64(time.Millisecond)).UTC().Format("2006-01-02 15:04:05.000") + return json.Marshal(fmt.Sprintf("%sZ", strings.TrimRight(ts, "0."))) +} + +type TimeMillis time.Duration + +func (t TimeMillis) MarshalJSON() ([]byte, error) { + ts := time.Unix(0, int64(t)).UTC().Format("15:04:05.000") + return json.Marshal(strings.TrimRight(ts, "0.")) +} + +type TimeMicros time.Duration + +func (t TimeMicros) MarshalJSON() ([]byte, error) { + ts := time.Unix(0, int64(t)).UTC().Format("15:04:05.000000") + return json.Marshal(strings.TrimRight(ts, "0.")) +} + +type ExplicitNamespace [12]byte + +func (t ExplicitNamespace) MarshalJSON() ([]byte, error) { + return json.Marshal(t[:]) +} + +type MD5 [16]byte + +func (t MD5) MarshalJSON() ([]byte, error) { + return json.Marshal(t[:]) +} + +type DecimalType []byte + +func (t DecimalType) MarshalJSON() ([]byte, error) { + v := new(big.Int).SetBytes(t) + s := fmt.Sprintf("%0*s", decimalTypeScale+1, v.String()) + point := len(s) - decimalTypeScale + return json.Marshal(s[:point] + "." + s[point:]) +} + +type Duration [12]byte + +func (t Duration) MarshalJSON() ([]byte, error) { + milliseconds := int32(binary.LittleEndian.Uint32(t[8:12])) + + m := map[string]interface{}{ + "months": int32(binary.LittleEndian.Uint32(t[0:4])), + "days": int32(binary.LittleEndian.Uint32(t[4:8])), + "nanoseconds": int64(milliseconds) * int64(time.Millisecond), + } + return json.Marshal(m) +} + +type Date int32 + +func (t Date) MarshalJSON() ([]byte, error) { + v := time.Unix(int64(t)*86400, 0).UTC().Format("2006-01-02") + return json.Marshal(v) +} + +type Example struct { + InheritNull string `avro:"inheritNull" json:"inheritNull"` + ExplicitNamespace ExplicitNamespace `avro:"explicitNamespace" json:"explicitNamespace"` + FullName FullNameData `avro:"fullName" json:"fullName"` + ID int32 `avro:"id" json:"id"` + BigID int64 `avro:"bigId" json:"bigId"` + Temperature *float32 `avro:"temperature" json:"temperature"` + Fraction *float64 `avro:"fraction" json:"fraction"` + IsEmergency bool `avro:"is_emergency" json:"is_emergency"` + RemoteIP *ByteArray `avro:"remote_ip" json:"remote_ip"` + Person PersonData `avro:"person" json:"person"` + DecimalField DecimalType `avro:"decimalField" json:"decimalField"` + Decimal256Field DecimalType `avro:"decimal256Field" json:"decimal256Field"` + UUIDField string `avro:"uuidField" json:"uuidField"` + TimeMillis TimeMillis `avro:"timemillis" json:"timemillis"` + TimeMicros TimeMicros `avro:"timemicros" json:"timemicros"` + TimestampMillis TimestampMillis `avro:"timestampmillis" json:"timestampmillis"` + TimestampMicros TimestampMicros `avro:"timestampmicros" json:"timestampmicros"` + Duration Duration `avro:"duration" json:"duration"` + Date Date `avro:"date" json:"date"` +} + +type FullNameData struct { + InheritNamespace string `avro:"inheritNamespace" json:"inheritNamespace"` + Md5 MD5 `avro:"md5" json:"md5"` +} +type MapField map[string]int64 + +func (t MapField) MarshalJSON() ([]byte, error) { + arr := make([]map[string]any, 0, len(t)) + for k, v := range t { + arr = append(arr, map[string]any{"key": k, "value": v}) + } + return json.Marshal(arr) +} + +type PersonData struct { + Lastname string `avro:"lastname" json:"lastname"` + Address AddressUSRecord `avro:"address" json:"address"` + Mapfield MapField `avro:"mapfield" json:"mapfield"` + ArrayField []string `avro:"arrayField" json:"arrayField"` +} + +type AddressUSRecord struct { + Streetaddress string `avro:"streetaddress" json:"streetaddress"` + City string `avro:"city" json:"city"` +} + +type TestPaths struct { + Avro string + Json string +} + +func Generate() TestPaths { + td, err := os.MkdirTemp("", "arrow-avro-testdata-*") + if err != nil { + log.Fatalf("failed to create temp dir: %v", err) + } + data := sampleData() + return TestPaths{ + Avro: writeOCFSampleData(td, data), + Json: writeJSONSampleData(td, data), + } +} + +func TestdataDir() string { + cwd, err := os.Getwd() + if err != nil { + log.Fatalf("failed to get cwd: %v", err) + } + switch filepath.Base(cwd) { + case "arrow-go": + return filepath.Join("arrow", "avro", "testdata") + case "avro": + return "testdata" + case "testdata": + return "." + } + log.Fatalf("unexpected cwd: %s", cwd) + return "" +} + +func AllTypesAvroSchema() (avro.Schema, error) { + sp := filepath.Join(TestdataDir(), SchemaFileName) + avroSchemaBytes, err := os.ReadFile(sp) + if err != nil { + return nil, err + } + return avro.ParseBytes(avroSchemaBytes) +} + +func sampleData() Example { + return Example{ + InheritNull: "a", + ExplicitNamespace: ExplicitNamespace{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, + FullName: FullNameData{ + InheritNamespace: "d", + Md5: MD5{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + }, + ID: 42, + BigID: 42000000000, + Temperature: func() *float32 { v := float32(36.6); return &v }(), + Fraction: func() *float64 { v := float64(0.75); return &v }(), + IsEmergency: true, + RemoteIP: func() *ByteArray { v := ByteArray{192, 168, 1, 1}; return &v }(), + Person: PersonData{ + Lastname: "Doe", + Address: AddressUSRecord{ + Streetaddress: "123 Main St", + City: "Metropolis", + }, + Mapfield: MapField{"foo": 123}, + ArrayField: []string{"one", "two"}, + }, + DecimalField: DecimalType{0x00, 0x00, 0x00, 0x00, 0x00, 0x26, 0x94}, + Decimal256Field: DecimalType{ + 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, + 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, + 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x01, + }, + UUIDField: "123e4567-e89b-12d3-a456-426614174000", + TimeMillis: TimeMillis(50412345 * time.Millisecond), + TimeMicros: TimeMicros(50412345678 * time.Microsecond), + TimestampMillis: TimestampMillis(time.Now().UnixNano() / int64(time.Millisecond)), + TimestampMicros: TimestampMicros(time.Now().UnixNano() / int64(time.Microsecond)), + Duration: Duration{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, + Date: Date(time.Now().Unix() / 86400), + } +} + +func writeOCFSampleData(td string, data Example) string { + path := filepath.Join(td, sampleAvroFileName) + ocfFile, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + log.Fatal(err) + } + defer ocfFile.Close() + schema, err := AllTypesAvroSchema() + if err != nil { + log.Fatal(err) + } + encoder, err := ocf.NewEncoder(schema.String(), ocfFile) + if err != nil { + log.Fatal(err) + } + defer encoder.Close() + + err = encoder.Encode(data) + if err != nil { + log.Fatal(err) + } + return path +} + +func writeJSONSampleData(td string, data Example) string { + path := filepath.Join(td, sampleJSONFileName) + jsonFile, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + log.Fatal(err) + } + defer jsonFile.Close() + enc := json.NewEncoder(jsonFile) + err = enc.Encode(data) + if err != nil { + log.Fatal(err) + } + return path +} diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt index adad69e..76c9c81 100644 --- a/dev/release/rat_exclude_files.txt +++ b/dev/release/rat_exclude_files.txt @@ -21,6 +21,7 @@ go.sum .github/pull_request_template.md +arrow/avro/testdata/alltypes.avsc arrow/flight/gen/flight/*.pb.go arrow/type_string.go arrow/unionmode_string.go