This is an automated email from the ASF dual-hosted git repository. sbinet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 71bcfdf ARROW-5551: [Go] implement FixedSizeArrays with 2-buffers layout 71bcfdf is described below commit 71bcfdfd2bb4f82c9652012001ffed882e7b968d Author: Sebastien Binet <bi...@cern.ch> AuthorDate: Tue Jun 11 18:45:23 2019 +0200 ARROW-5551: [Go] implement FixedSizeArrays with 2-buffers layout Author: Sebastien Binet <bi...@cern.ch> Closes #4517 from sbinet/issue-5551 and squashes the following commits: 47162f07a <Sebastien Binet> ARROW-5551: implement FixedSizeArrays with 2-buffers layout --- go/arrow/array/array_test.go | 2 +- go/arrow/array/fixedsize_binary.go | 45 +++++++---------------- go/arrow/array/fixedsize_binary_test.go | 8 +++-- go/arrow/array/fixedsize_binarybuilder.go | 50 +++++++------------------- go/arrow/array/fixedsize_binarybuilder_test.go | 9 ----- go/arrow/internal/arrdata/arrdata.go | 50 ++++++++++++++++++++++++++ go/arrow/ipc/cmd/arrow-cat/main_test.go | 32 +++++++++++++++++ go/arrow/ipc/cmd/arrow-ls/main_test.go | 26 ++++++++++++++ go/arrow/ipc/file_reader.go | 2 +- go/arrow/ipc/writer.go | 28 ++++++++++++--- 10 files changed, 165 insertions(+), 87 deletions(-) diff --git a/go/arrow/array/array_test.go b/go/arrow/array/array_test.go index 3a3407f..884bb8d 100644 --- a/go/arrow/array/array_test.go +++ b/go/arrow/array/array_test.go @@ -63,7 +63,7 @@ func TestMakeFromData(t *testing.T) { {name: "timestamp", d: &testDataType{arrow.TIMESTAMP}}, {name: "time32", d: &testDataType{arrow.TIME32}}, {name: "time64", d: &testDataType{arrow.TIME64}}, - {name: "fixed_size_binary", d: &testDataType{arrow.FIXED_SIZE_BINARY}, size: 3}, + {name: "fixed_size_binary", d: &testDataType{arrow.FIXED_SIZE_BINARY}}, {name: "list", d: &testDataType{arrow.LIST}, child: []*array.Data{ array.NewData(&testDataType{arrow.INT64}, 0, make([]*memory.Buffer, 4), nil, 0, 0), diff --git a/go/arrow/array/fixedsize_binary.go b/go/arrow/array/fixedsize_binary.go index db24a86..5134302 100644 --- a/go/arrow/array/fixedsize_binary.go +++ b/go/arrow/array/fixedsize_binary.go @@ -26,13 +26,14 @@ import ( // A type which represents an immutable sequence of fixed-length binary strings. type FixedSizeBinary struct { array - valueOffsets []int32 - valueBytes []byte + + valueBytes []byte + bytewidth int32 } // NewFixedSizeBinaryData constructs a new fixed-size binary array from data. func NewFixedSizeBinaryData(data *Data) *FixedSizeBinary { - a := &FixedSizeBinary{} + a := &FixedSizeBinary{bytewidth: int32(data.DataType().(arrow.FixedWidthDataType).BitWidth() / 8)} a.refCount = 1 a.setData(data) return a @@ -41,14 +42,14 @@ func NewFixedSizeBinaryData(data *Data) *FixedSizeBinary { // Value returns the fixed-size slice at index i. This value should not be mutated. func (a *FixedSizeBinary) Value(i int) []byte { i += a.array.data.offset - return a.valueBytes[a.valueOffsets[i]:a.valueOffsets[i+1]] + var ( + bw = int(a.bytewidth) + beg = i * bw + end = (i + 1) * bw + ) + return a.valueBytes[beg:end] } -func (a *FixedSizeBinary) ValueOffset(i int) int { return int(a.valueOffsets[i]) } -func (a *FixedSizeBinary) ValueLen(i int) int { return int(a.valueOffsets[i+1] - a.valueOffsets[i]) } -func (a *FixedSizeBinary) ValueOffsets() []int32 { return a.valueOffsets } -func (a *FixedSizeBinary) ValueBytes() []byte { return a.valueBytes } - func (a *FixedSizeBinary) String() string { o := new(strings.Builder) o.WriteString("[") @@ -68,32 +69,12 @@ func (a *FixedSizeBinary) String() string { } func (a *FixedSizeBinary) setData(data *Data) { - if len(data.buffers) != 3 { - panic("len(data.buffers) != 3") - } - a.array.setData(data) - - if valueBytes := data.buffers[2]; valueBytes != nil { - a.valueBytes = valueBytes.Bytes() + vals := data.buffers[1] + if vals != nil { + a.valueBytes = vals.Bytes() } - switch valueOffsets := data.buffers[1]; valueOffsets { - case nil: - // re-compute offsets - offsets := make([]int32, a.Len()+1) - bw := a.DataType().(arrow.FixedWidthDataType).BitWidth() / 8 - for i := range offsets[1:] { - var delta int32 - if a.IsValid(i) { - delta = int32(bw) - } - offsets[i+1] = offsets[i] + delta - } - a.valueOffsets = offsets - default: - a.valueOffsets = arrow.Int32Traits.CastFromBytes(valueOffsets.Bytes()) - } } var ( diff --git a/go/arrow/array/fixedsize_binary_test.go b/go/arrow/array/fixedsize_binary_test.go index fdb0fbf..4d2d724 100644 --- a/go/arrow/array/fixedsize_binary_test.go +++ b/go/arrow/array/fixedsize_binary_test.go @@ -33,6 +33,8 @@ func TestFixedSizeBinary(t *testing.T) { dtype := arrow.FixedSizeBinaryType{ByteWidth: 7} b := array.NewFixedSizeBinaryBuilder(mem, &dtype) + zero := make([]byte, dtype.ByteWidth) + values := [][]byte{ []byte("7654321"), nil, @@ -48,7 +50,9 @@ func TestFixedSizeBinary(t *testing.T) { assert.Equal(t, 3, a.Len()) assert.Equal(t, 1, a.NullN()) assert.Equal(t, []byte("7654321"), a.Value(0)) - assert.Equal(t, []byte{}, a.Value(1)) + assert.Equal(t, zero, a.Value(1)) + assert.Equal(t, true, a.IsNull(1)) + assert.Equal(t, false, a.IsValid(1)) assert.Equal(t, []byte("AZERTYU"), a.Value(2)) a.Release() @@ -58,7 +62,7 @@ func TestFixedSizeBinary(t *testing.T) { assert.Equal(t, 3, a.Len()) assert.Equal(t, 1, a.NullN()) assert.Equal(t, []byte("7654321"), a.Value(0)) - assert.Equal(t, []byte{}, a.Value(1)) + assert.Equal(t, zero, a.Value(1)) assert.Equal(t, []byte("AZERTYU"), a.Value(2)) a.Release() diff --git a/go/arrow/array/fixedsize_binarybuilder.go b/go/arrow/array/fixedsize_binarybuilder.go index 053a192..8a2f65f 100644 --- a/go/arrow/array/fixedsize_binarybuilder.go +++ b/go/arrow/array/fixedsize_binarybuilder.go @@ -17,6 +17,7 @@ package array import ( + "fmt" "sync/atomic" "github.com/apache/arrow/go/arrow" @@ -28,16 +29,14 @@ import ( type FixedSizeBinaryBuilder struct { builder - dtype *arrow.FixedSizeBinaryType - offsets *int32BufferBuilder - values *byteBufferBuilder + dtype *arrow.FixedSizeBinaryType + values *byteBufferBuilder } func NewFixedSizeBinaryBuilder(mem memory.Allocator, dtype *arrow.FixedSizeBinaryType) *FixedSizeBinaryBuilder { b := &FixedSizeBinaryBuilder{ builder: builder{refCount: 1, mem: mem}, dtype: dtype, - offsets: newInt32BufferBuilder(mem), values: newByteBufferBuilder(mem), } return b @@ -54,10 +53,6 @@ func (b *FixedSizeBinaryBuilder) Release() { b.nullBitmap.Release() b.nullBitmap = nil } - if b.offsets != nil { - b.offsets.Release() - b.offsets = nil - } if b.values != nil { b.values.Release() b.values = nil @@ -72,14 +67,13 @@ func (b *FixedSizeBinaryBuilder) Append(v []byte) { } b.Reserve(1) - b.appendNextOffset() b.values.Append(v) b.UnsafeAppendBoolToBitmap(true) } func (b *FixedSizeBinaryBuilder) AppendNull() { b.Reserve(1) - b.appendNextOffset() + b.values.Advance(b.dtype.ByteWidth) b.UnsafeAppendBoolToBitmap(false) } @@ -97,25 +91,19 @@ func (b *FixedSizeBinaryBuilder) AppendValues(v [][]byte, valid []bool) { b.Reserve(len(v)) for _, vv := range v { - b.appendNextOffset() - b.values.Append(vv) + switch len(vv) { + case 0: + b.values.Advance(b.dtype.ByteWidth) + case b.dtype.ByteWidth: + b.values.Append(vv) + default: + panic(fmt.Errorf("array: invalid binary length (got=%d, want=%d)", len(vv), b.dtype.ByteWidth)) + } } b.builder.unsafeAppendBoolsToBitmap(valid, len(v)) } -func (b *FixedSizeBinaryBuilder) Value(i int) []byte { - offsets := b.offsets.Values() - start := int(offsets[i]) - var end int - if i == (b.length - 1) { - end = b.values.Len() - } else { - end = int(offsets[i+1]) - } - return b.values.Bytes()[start:end] -} - func (b *FixedSizeBinaryBuilder) init(capacity int) { b.builder.init(capacity) b.values.resize(capacity * b.dtype.ByteWidth) @@ -130,7 +118,6 @@ func (b *FixedSizeBinaryBuilder) Reserve(n int) { // Resize adjusts the space allocated by b to n elements. If n is greater than b.Cap(), // additional memory will be allocated. If n is smaller, the allocated memory may reduced. func (b *FixedSizeBinaryBuilder) Resize(n int) { - b.offsets.resize((n + 1) * arrow.Int32SizeBytes) b.builder.resize(n, b.init) } @@ -150,29 +137,18 @@ func (b *FixedSizeBinaryBuilder) NewFixedSizeBinaryArray() (a *FixedSizeBinary) } func (b *FixedSizeBinaryBuilder) newData() (data *Data) { - b.appendNextOffset() values := b.values.Finish() - offsets := b.offsets.Finish() - data = NewData(b.dtype, b.length, []*memory.Buffer{b.nullBitmap, offsets, values}, nil, b.nulls, 0) + data = NewData(b.dtype, b.length, []*memory.Buffer{b.nullBitmap, values}, nil, b.nulls, 0) if values != nil { values.Release() } - if offsets != nil { - offsets.Release() - } b.builder.reset() return } -func (b *FixedSizeBinaryBuilder) appendNextOffset() { - numBytes := b.values.Len() - // TODO(alexandre): check binaryArrayMaximumCapacity? - b.offsets.AppendValue(int32(numBytes)) -} - var ( _ Builder = (*FixedSizeBinaryBuilder)(nil) ) diff --git a/go/arrow/array/fixedsize_binarybuilder_test.go b/go/arrow/array/fixedsize_binarybuilder_test.go index f50e1b0..08740c5 100644 --- a/go/arrow/array/fixedsize_binarybuilder_test.go +++ b/go/arrow/array/fixedsize_binarybuilder_test.go @@ -39,11 +39,6 @@ func TestFixedSizeBinaryBuilder(t *testing.T) { assert.Equal(t, 4, b.Len(), "unexpected Len()") assert.Equal(t, 2, b.NullN(), "unexpected NullN()") - assert.Equal(t, b.Value(0), []byte("1234567")) - assert.Equal(t, b.Value(1), []byte{}) - assert.Equal(t, b.Value(2), []byte("ABCDEFG")) - assert.Equal(t, b.Value(3), []byte{}) - values := [][]byte{ []byte("7654321"), nil, @@ -54,10 +49,6 @@ func TestFixedSizeBinaryBuilder(t *testing.T) { assert.Equal(t, 7, b.Len(), "unexpected Len()") assert.Equal(t, 3, b.NullN(), "unexpected NullN()") - assert.Equal(t, []byte("7654321"), b.Value(4)) - assert.Equal(t, []byte{}, b.Value(5)) - assert.Equal(t, []byte("AZERTYU"), b.Value(6)) - a := b.NewFixedSizeBinaryArray() // check state of builder after NewFixedSizeBinaryArray diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go index 504ade6..b7daf6f 100644 --- a/go/arrow/internal/arrdata/arrdata.go +++ b/go/arrow/internal/arrdata/arrdata.go @@ -39,6 +39,7 @@ func init() { Records["strings"] = makeStringsRecords() Records["fixed_size_lists"] = makeFixedSizeListsRecords() Records["fixed_width_types"] = makeFixedWidthTypesRecords() + Records["fixed_size_binaries"] = makeFixedSizeBinariesRecords() for k := range Records { RecordNames = append(RecordNames, k) @@ -398,6 +399,45 @@ func makeFixedWidthTypesRecords() []array.Record { return recs } +type fsb3 string + +func makeFixedSizeBinariesRecords() []array.Record { + mem := memory.NewGoAllocator() + schema := arrow.NewSchema( + []arrow.Field{ + arrow.Field{Name: "fixed_size_binary_3", Type: &arrow.FixedSizeBinaryType{ByteWidth: 3}, Nullable: true}, + }, nil, + ) + + mask := []bool{true, false, false, true, true} + chunks := [][]array.Interface{ + []array.Interface{ + arrayOf(mem, []fsb3{"001", "002", "003", "004", "005"}, mask), + }, + []array.Interface{ + arrayOf(mem, []fsb3{"011", "012", "013", "014", "015"}, mask), + }, + []array.Interface{ + arrayOf(mem, []fsb3{"021", "022", "023", "024", "025"}, mask), + }, + } + + defer func() { + for _, chunk := range chunks { + for _, col := range chunk { + col.Release() + } + } + }() + + recs := make([]array.Record, len(chunks)) + for i, chunk := range chunks { + recs[i] = array.NewRecord(schema, chunk, -1) + } + + return recs +} + func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface { if mem == nil { mem = memory.NewGoAllocator() @@ -567,6 +607,16 @@ func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface bldr.AppendValues(a, valids) return bldr.NewArray() + case []fsb3: + bldr := array.NewFixedSizeBinaryBuilder(mem, &arrow.FixedSizeBinaryType{ByteWidth: 3}) + defer bldr.Release() + vs := make([][]byte, len(a)) + for i, v := range a { + vs[i] = []byte(v) + } + bldr.AppendValues(vs, valids) + return bldr.NewArray() + default: panic(fmt.Errorf("arrdata: invalid data slice type %T", a)) } diff --git a/go/arrow/ipc/cmd/arrow-cat/main_test.go b/go/arrow/ipc/cmd/arrow-cat/main_test.go index 5a8f031..3f9c3e7 100644 --- a/go/arrow/ipc/cmd/arrow-cat/main_test.go +++ b/go/arrow/ipc/cmd/arrow-cat/main_test.go @@ -149,6 +149,16 @@ record 3... col[7] "date64s": [-22 (null) (null) 21 22] `, }, + { + name: "fixed_size_binaries", + want: `record 1... + col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"] +record 2... + col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"] +record 3... + col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"] +`, + }, } { t.Run(tc.name, func(t *testing.T) { mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) @@ -450,6 +460,28 @@ record 3/3... col[7] "date64s": [-22 (null) (null) 21 22] `, }, + { + stream: true, + name: "fixed_size_binaries", + want: `record 1... + col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"] +record 2... + col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"] +record 3... + col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"] +`, + }, + { + name: "fixed_size_binaries", + want: `version: V4 +record 1/3... + col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"] +record 2/3... + col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"] +record 3/3... + col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"] +`, + }, } { t.Run(fmt.Sprintf("%s-stream=%v", tc.name, tc.stream), func(t *testing.T) { mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) diff --git a/go/arrow/ipc/cmd/arrow-ls/main_test.go b/go/arrow/ipc/cmd/arrow-ls/main_test.go index 64cd478..0488eae 100644 --- a/go/arrow/ipc/cmd/arrow-ls/main_test.go +++ b/go/arrow/ipc/cmd/arrow-ls/main_test.go @@ -102,6 +102,14 @@ records: 3 records: 3 `, }, + { + name: "fixed_size_binaries", + want: `schema: + fields: 1 + - fixed_size_binary_3: type=fixed_size_binary[3], nullable +records: 3 +`, + }, } { t.Run(tc.name, func(t *testing.T) { mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) @@ -239,6 +247,24 @@ schema: records: 4 `, }, + { + stream: true, + name: "fixed_size_binaries", + want: `schema: + fields: 1 + - fixed_size_binary_3: type=fixed_size_binary[3], nullable +records: 3 +`, + }, + { + name: "fixed_size_binaries", + want: `version: V4 +schema: + fields: 1 + - fixed_size_binary_3: type=fixed_size_binary[3], nullable +records: 3 +`, + }, } { t.Run(fmt.Sprintf("%s-stream=%v", tc.name, tc.stream), func(t *testing.T) { mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index b8f08cc..5638f8b 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -450,7 +450,7 @@ func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) array.Interface { func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType) array.Interface { field, buffers := ctx.loadCommon(2) - buffers = append(buffers, nil, ctx.buffer()) + buffers = append(buffers, ctx.buffer()) data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0) defer data.Release() diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go index e252023..e5797ba 100644 --- a/go/arrow/ipc/writer.go +++ b/go/arrow/ipc/writer.go @@ -237,12 +237,19 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error { case arrow.FixedWidthDataType: data := arr.Data() values := data.Buffers()[1] - typeWidth := dtype.BitWidth() / 8 - minLength := paddedLength(int64(arr.Len())*int64(typeWidth), kArrowAlignment) + arrLen := int64(arr.Len()) + typeWidth := int64(dtype.BitWidth() / 8) + minLength := paddedLength(arrLen*typeWidth, kArrowAlignment) switch { case needTruncate(int64(data.Offset()), values, minLength): - panic("not implemented") // FIXME(sbinet) writer.cc:212 + // non-zero offset: slice the buffer + offset := int64(data.Offset()) * typeWidth + // send padding if available + len := minI64(bitutil.CeilByte64(arrLen*typeWidth), int64(data.Len())-offset) + data = array.NewSliceData(data, offset, offset+len) + defer data.Release() + values = data.Buffers()[1] default: if values != nil { values.Retain() @@ -268,7 +275,9 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error { case needTruncate(int64(data.Offset()), values, totalDataBytes): panic("not implemented") // FIXME(sbinet) writer.cc:264 default: - values.Retain() + if values != nil { + values.Retain() + } } p.body = append(p.body, voffsets) p.body = append(p.body, values) @@ -291,7 +300,9 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error { case needTruncate(int64(data.Offset()), values, totalDataBytes): panic("not implemented") // FIXME(sbinet) writer.cc:264 default: - values.Retain() + if values != nil { + values.Retain() + } } p.body = append(p.body, voffsets) p.body = append(p.body, values) @@ -430,3 +441,10 @@ func needTruncate(offset int64, buf *memory.Buffer, minLength int64) bool { } return offset != 0 || minLength < int64(buf.Len()) } + +func minI64(a, b int64) int64 { + if a < b { + return a + } + return b +}