This is an automated email from the ASF dual-hosted git repository. sbinet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new dee0c1f ARROW-5591: [Go] implement read/write IPC for Duration & Intervals dee0c1f is described below commit dee0c1f0d404192d3ba222fc4be7aee88ad3c16b Author: Sebastien Binet <bi...@cern.ch> AuthorDate: Fri Jun 14 18:25:07 2019 +0200 ARROW-5591: [Go] implement read/write IPC for Duration & Intervals Author: Sebastien Binet <bi...@cern.ch> Closes #4564 from sbinet/issue-5591 and squashes the following commits: c2e638b28 <Sebastien Binet> go/arrow/ipc: implement read/write IPC for Duration 375844faf <Sebastien Binet> ARROW-5591: implement read/write IPC for Duration & Intervals --- go/arrow/internal/arrdata/arrdata.go | 155 +++++++++++++++++++++++++++++++++++ go/arrow/ipc/file_reader.go | 4 +- go/arrow/ipc/metadata.go | 53 ++++++++++++ 3 files changed, 211 insertions(+), 1 deletion(-) diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go index e76d68a..aeb7ee5 100644 --- a/go/arrow/internal/arrdata/arrdata.go +++ b/go/arrow/internal/arrdata/arrdata.go @@ -40,6 +40,8 @@ func init() { Records["fixed_size_lists"] = makeFixedSizeListsRecords() Records["fixed_width_types"] = makeFixedWidthTypesRecords() Records["fixed_size_binaries"] = makeFixedSizeBinariesRecords() + Records["intervals"] = makeIntervalsRecords() + Records["durations"] = makeDurationsRecords() for k := range Records { RecordNames = append(RecordNames, k) @@ -474,6 +476,105 @@ func makeFixedSizeBinariesRecords() []array.Record { return recs } +func makeIntervalsRecords() []array.Record { + mem := memory.NewGoAllocator() + + schema := arrow.NewSchema( + []arrow.Field{ + arrow.Field{Name: "months", Type: arrow.FixedWidthTypes.MonthInterval, Nullable: true}, + arrow.Field{Name: "days", Type: arrow.FixedWidthTypes.DayTimeInterval, Nullable: true}, + }, nil, + ) + + mask := []bool{true, false, false, true, true} + chunks := [][]array.Interface{ + []array.Interface{ + arrayOf(mem, []arrow.MonthInterval{1, 2, 3, 4, 5}, mask), + arrayOf(mem, []arrow.DayTimeInterval{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}}, mask), + }, + []array.Interface{ + arrayOf(mem, []arrow.MonthInterval{11, 12, 13, 14, 15}, mask), + arrayOf(mem, []arrow.DayTimeInterval{{11, 11}, {12, 12}, {13, 13}, {14, 14}, {15, 15}}, mask), + }, + []array.Interface{ + arrayOf(mem, []arrow.MonthInterval{21, 22, 23, 24, 25}, mask), + arrayOf(mem, []arrow.DayTimeInterval{{21, 21}, {22, 22}, {23, 23}, {24, 24}, {25, 25}}, mask), + }, + } + + defer func() { + for _, chunk := range chunks { + for _, col := range chunk { + col.Release() + } + } + }() + + recs := make([]array.Record, len(chunks)) + for i, chunk := range chunks { + recs[i] = array.NewRecord(schema, chunk, -1) + } + + return recs +} + +type ( + duration_s arrow.Duration + duration_ms arrow.Duration + duration_us arrow.Duration + duration_ns arrow.Duration +) + +func makeDurationsRecords() []array.Record { + mem := memory.NewGoAllocator() + + schema := arrow.NewSchema( + []arrow.Field{ + arrow.Field{Name: "durations-s", Type: &arrow.DurationType{Unit: arrow.Second}, Nullable: true}, + arrow.Field{Name: "durations-ms", Type: &arrow.DurationType{Unit: arrow.Millisecond}, Nullable: true}, + arrow.Field{Name: "durations-us", Type: &arrow.DurationType{Unit: arrow.Microsecond}, Nullable: true}, + arrow.Field{Name: "durations-ns", Type: &arrow.DurationType{Unit: arrow.Nanosecond}, Nullable: true}, + }, nil, + ) + + mask := []bool{true, false, false, true, true} + chunks := [][]array.Interface{ + []array.Interface{ + arrayOf(mem, []duration_s{1, 2, 3, 4, 5}, mask), + arrayOf(mem, []duration_ms{1, 2, 3, 4, 5}, mask), + arrayOf(mem, []duration_us{1, 2, 3, 4, 5}, mask), + arrayOf(mem, []duration_ns{1, 2, 3, 4, 5}, mask), + }, + []array.Interface{ + arrayOf(mem, []duration_s{11, 12, 13, 14, 15}, mask), + arrayOf(mem, []duration_ms{11, 12, 13, 14, 15}, mask), + arrayOf(mem, []duration_us{11, 12, 13, 14, 15}, mask), + arrayOf(mem, []duration_ns{11, 12, 13, 14, 15}, mask), + }, + []array.Interface{ + arrayOf(mem, []duration_s{21, 22, 23, 24, 25}, mask), + arrayOf(mem, []duration_ms{21, 22, 23, 24, 25}, mask), + arrayOf(mem, []duration_us{21, 22, 23, 24, 25}, mask), + arrayOf(mem, []duration_ns{21, 22, 23, 24, 25}, mask), + }, + } + + defer func() { + for _, chunk := range chunks { + for _, col := range chunk { + col.Release() + } + } + }() + + recs := make([]array.Record, len(chunks)) + for i, chunk := range chunks { + recs[i] = array.NewRecord(schema, chunk, -1) + } + + return recs +} + func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface { if mem == nil { mem = memory.NewGoAllocator() @@ -653,6 +754,60 @@ func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface bldr.AppendValues(vs, valids) return bldr.NewArray() + case []arrow.MonthInterval: + bldr := array.NewMonthIntervalBuilder(mem) + defer bldr.Release() + + bldr.AppendValues(a, valids) + return bldr.NewArray() + + case []arrow.DayTimeInterval: + bldr := array.NewDayTimeIntervalBuilder(mem) + defer bldr.Release() + + bldr.AppendValues(a, valids) + return bldr.NewArray() + + case []duration_s: + bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Second}) + defer bldr.Release() + vs := make([]arrow.Duration, len(a)) + for i, v := range a { + vs[i] = arrow.Duration(v) + } + bldr.AppendValues(vs, valids) + return bldr.NewArray() + + case []duration_ms: + bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Millisecond}) + defer bldr.Release() + vs := make([]arrow.Duration, len(a)) + for i, v := range a { + vs[i] = arrow.Duration(v) + } + bldr.AppendValues(vs, valids) + return bldr.NewArray() + + case []duration_us: + bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Microsecond}) + defer bldr.Release() + vs := make([]arrow.Duration, len(a)) + for i, v := range a { + vs[i] = arrow.Duration(v) + } + bldr.AppendValues(vs, valids) + return bldr.NewArray() + + case []duration_ns: + bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Nanosecond}) + defer bldr.Release() + vs := make([]arrow.Duration, len(a)) + for i, v := range a { + vs[i] = arrow.Duration(v) + } + bldr.AppendValues(vs, valids) + return bldr.NewArray() + default: panic(fmt.Errorf("arrdata: invalid data slice type %T", a)) } diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index 5638f8b..86a372d 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -362,7 +362,9 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) array.Interface { *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type, *arrow.Time32Type, *arrow.Time64Type, *arrow.TimestampType, - *arrow.Date32Type, *arrow.Date64Type: + *arrow.Date32Type, *arrow.Date64Type, + *arrow.MonthIntervalType, *arrow.DayTimeIntervalType, + *arrow.DurationType: return ctx.loadPrimitive(dt) case *arrow.BinaryType, *arrow.StringType: diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go index 91db1f2..a0e9364 100644 --- a/go/arrow/ipc/metadata.go +++ b/go/arrow/ipc/metadata.go @@ -334,6 +334,25 @@ func (fv *fieldVisitor) visit(dt arrow.DataType) { flatbuf.FixedSizeListAddListSize(fv.b, dt.Len()) fv.offset = flatbuf.FixedSizeListEnd(fv.b) + case *arrow.MonthIntervalType: + fv.dtype = flatbuf.TypeInterval + flatbuf.IntervalStart(fv.b) + flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitYEAR_MONTH) + fv.offset = flatbuf.IntervalEnd(fv.b) + + case *arrow.DayTimeIntervalType: + fv.dtype = flatbuf.TypeInterval + flatbuf.IntervalStart(fv.b) + flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitDAY_TIME) + fv.offset = flatbuf.IntervalEnd(fv.b) + + case *arrow.DurationType: + fv.dtype = flatbuf.TypeDuration + unit := unitToFB(dt.Unit) + flatbuf.DurationStart(fv.b) + flatbuf.DurationAddUnit(fv.b, unit) + fv.offset = flatbuf.DurationEnd(fv.b) + default: err := errors.Errorf("arrow/ipc: invalid data type %v", dt) panic(err) // FIXME(sbinet): implement all data-types. @@ -537,6 +556,16 @@ func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arr dt.Init(data.Bytes, data.Pos) return dateFromFB(dt) + case flatbuf.TypeInterval: + var dt flatbuf.Interval + dt.Init(data.Bytes, data.Pos) + return intervalFromFB(dt) + + case flatbuf.TypeDuration: + var dt flatbuf.Duration + dt.Init(data.Bytes, data.Pos) + return durationFromFB(dt) + default: // FIXME(sbinet): implement all the other types. panic(fmt.Errorf("arrow/ipc: type %v not implemented", flatbuf.EnumNamesType[typ])) @@ -666,6 +695,30 @@ func dateFromFB(data flatbuf.Date) (arrow.DataType, error) { return nil, errors.Errorf("arrow/ipc: Date type with %d unit not implemented", data.Unit()) } +func intervalFromFB(data flatbuf.Interval) (arrow.DataType, error) { + switch data.Unit() { + case flatbuf.IntervalUnitYEAR_MONTH: + return arrow.FixedWidthTypes.MonthInterval, nil + case flatbuf.IntervalUnitDAY_TIME: + return arrow.FixedWidthTypes.DayTimeInterval, nil + } + return nil, errors.Errorf("arrow/ipc: Interval type with %d unit not implemented", data.Unit()) +} + +func durationFromFB(data flatbuf.Duration) (arrow.DataType, error) { + switch data.Unit() { + case flatbuf.TimeUnitSECOND: + return arrow.FixedWidthTypes.Duration_s, nil + case flatbuf.TimeUnitMILLISECOND: + return arrow.FixedWidthTypes.Duration_ms, nil + case flatbuf.TimeUnitMICROSECOND: + return arrow.FixedWidthTypes.Duration_us, nil + case flatbuf.TimeUnitNANOSECOND: + return arrow.FixedWidthTypes.Duration_ns, nil + } + return nil, errors.Errorf("arrow/ipc: Duration type with %d unit not implemented", data.Unit()) +} + type customMetadataer interface { CustomMetadataLength() int CustomMetadata(*flatbuf.KeyValue, int) bool