This is an automated email from the ASF dual-hosted git repository.

sbinet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 71bcfdf  ARROW-5551: [Go] implement FixedSizeArrays with 2-buffers 
layout
71bcfdf is described below

commit 71bcfdfd2bb4f82c9652012001ffed882e7b968d
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Tue Jun 11 18:45:23 2019 +0200

    ARROW-5551: [Go] implement FixedSizeArrays with 2-buffers layout
    
    Author: Sebastien Binet <bi...@cern.ch>
    
    Closes #4517 from sbinet/issue-5551 and squashes the following commits:
    
    47162f07a <Sebastien Binet> ARROW-5551:  implement FixedSizeArrays with 
2-buffers layout
---
 go/arrow/array/array_test.go                   |  2 +-
 go/arrow/array/fixedsize_binary.go             | 45 +++++++----------------
 go/arrow/array/fixedsize_binary_test.go        |  8 +++--
 go/arrow/array/fixedsize_binarybuilder.go      | 50 +++++++-------------------
 go/arrow/array/fixedsize_binarybuilder_test.go |  9 -----
 go/arrow/internal/arrdata/arrdata.go           | 50 ++++++++++++++++++++++++++
 go/arrow/ipc/cmd/arrow-cat/main_test.go        | 32 +++++++++++++++++
 go/arrow/ipc/cmd/arrow-ls/main_test.go         | 26 ++++++++++++++
 go/arrow/ipc/file_reader.go                    |  2 +-
 go/arrow/ipc/writer.go                         | 28 ++++++++++++---
 10 files changed, 165 insertions(+), 87 deletions(-)

diff --git a/go/arrow/array/array_test.go b/go/arrow/array/array_test.go
index 3a3407f..884bb8d 100644
--- a/go/arrow/array/array_test.go
+++ b/go/arrow/array/array_test.go
@@ -63,7 +63,7 @@ func TestMakeFromData(t *testing.T) {
                {name: "timestamp", d: &testDataType{arrow.TIMESTAMP}},
                {name: "time32", d: &testDataType{arrow.TIME32}},
                {name: "time64", d: &testDataType{arrow.TIME64}},
-               {name: "fixed_size_binary", d: 
&testDataType{arrow.FIXED_SIZE_BINARY}, size: 3},
+               {name: "fixed_size_binary", d: 
&testDataType{arrow.FIXED_SIZE_BINARY}},
 
                {name: "list", d: &testDataType{arrow.LIST}, child: 
[]*array.Data{
                        array.NewData(&testDataType{arrow.INT64}, 0, 
make([]*memory.Buffer, 4), nil, 0, 0),
diff --git a/go/arrow/array/fixedsize_binary.go 
b/go/arrow/array/fixedsize_binary.go
index db24a86..5134302 100644
--- a/go/arrow/array/fixedsize_binary.go
+++ b/go/arrow/array/fixedsize_binary.go
@@ -26,13 +26,14 @@ import (
 // A type which represents an immutable sequence of fixed-length binary 
strings.
 type FixedSizeBinary struct {
        array
-       valueOffsets []int32
-       valueBytes   []byte
+
+       valueBytes []byte
+       bytewidth  int32
 }
 
 // NewFixedSizeBinaryData constructs a new fixed-size binary array from data.
 func NewFixedSizeBinaryData(data *Data) *FixedSizeBinary {
-       a := &FixedSizeBinary{}
+       a := &FixedSizeBinary{bytewidth: 
int32(data.DataType().(arrow.FixedWidthDataType).BitWidth() / 8)}
        a.refCount = 1
        a.setData(data)
        return a
@@ -41,14 +42,14 @@ func NewFixedSizeBinaryData(data *Data) *FixedSizeBinary {
 // Value returns the fixed-size slice at index i. This value should not be 
mutated.
 func (a *FixedSizeBinary) Value(i int) []byte {
        i += a.array.data.offset
-       return a.valueBytes[a.valueOffsets[i]:a.valueOffsets[i+1]]
+       var (
+               bw  = int(a.bytewidth)
+               beg = i * bw
+               end = (i + 1) * bw
+       )
+       return a.valueBytes[beg:end]
 }
 
-func (a *FixedSizeBinary) ValueOffset(i int) int { return 
int(a.valueOffsets[i]) }
-func (a *FixedSizeBinary) ValueLen(i int) int    { return 
int(a.valueOffsets[i+1] - a.valueOffsets[i]) }
-func (a *FixedSizeBinary) ValueOffsets() []int32 { return a.valueOffsets }
-func (a *FixedSizeBinary) ValueBytes() []byte    { return a.valueBytes }
-
 func (a *FixedSizeBinary) String() string {
        o := new(strings.Builder)
        o.WriteString("[")
@@ -68,32 +69,12 @@ func (a *FixedSizeBinary) String() string {
 }
 
 func (a *FixedSizeBinary) setData(data *Data) {
-       if len(data.buffers) != 3 {
-               panic("len(data.buffers) != 3")
-       }
-
        a.array.setData(data)
-
-       if valueBytes := data.buffers[2]; valueBytes != nil {
-               a.valueBytes = valueBytes.Bytes()
+       vals := data.buffers[1]
+       if vals != nil {
+               a.valueBytes = vals.Bytes()
        }
 
-       switch valueOffsets := data.buffers[1]; valueOffsets {
-       case nil:
-               // re-compute offsets
-               offsets := make([]int32, a.Len()+1)
-               bw := a.DataType().(arrow.FixedWidthDataType).BitWidth() / 8
-               for i := range offsets[1:] {
-                       var delta int32
-                       if a.IsValid(i) {
-                               delta = int32(bw)
-                       }
-                       offsets[i+1] = offsets[i] + delta
-               }
-               a.valueOffsets = offsets
-       default:
-               a.valueOffsets = 
arrow.Int32Traits.CastFromBytes(valueOffsets.Bytes())
-       }
 }
 
 var (
diff --git a/go/arrow/array/fixedsize_binary_test.go 
b/go/arrow/array/fixedsize_binary_test.go
index fdb0fbf..4d2d724 100644
--- a/go/arrow/array/fixedsize_binary_test.go
+++ b/go/arrow/array/fixedsize_binary_test.go
@@ -33,6 +33,8 @@ func TestFixedSizeBinary(t *testing.T) {
        dtype := arrow.FixedSizeBinaryType{ByteWidth: 7}
        b := array.NewFixedSizeBinaryBuilder(mem, &dtype)
 
+       zero := make([]byte, dtype.ByteWidth)
+
        values := [][]byte{
                []byte("7654321"),
                nil,
@@ -48,7 +50,9 @@ func TestFixedSizeBinary(t *testing.T) {
        assert.Equal(t, 3, a.Len())
        assert.Equal(t, 1, a.NullN())
        assert.Equal(t, []byte("7654321"), a.Value(0))
-       assert.Equal(t, []byte{}, a.Value(1))
+       assert.Equal(t, zero, a.Value(1))
+       assert.Equal(t, true, a.IsNull(1))
+       assert.Equal(t, false, a.IsValid(1))
        assert.Equal(t, []byte("AZERTYU"), a.Value(2))
        a.Release()
 
@@ -58,7 +62,7 @@ func TestFixedSizeBinary(t *testing.T) {
        assert.Equal(t, 3, a.Len())
        assert.Equal(t, 1, a.NullN())
        assert.Equal(t, []byte("7654321"), a.Value(0))
-       assert.Equal(t, []byte{}, a.Value(1))
+       assert.Equal(t, zero, a.Value(1))
        assert.Equal(t, []byte("AZERTYU"), a.Value(2))
        a.Release()
 
diff --git a/go/arrow/array/fixedsize_binarybuilder.go 
b/go/arrow/array/fixedsize_binarybuilder.go
index 053a192..8a2f65f 100644
--- a/go/arrow/array/fixedsize_binarybuilder.go
+++ b/go/arrow/array/fixedsize_binarybuilder.go
@@ -17,6 +17,7 @@
 package array
 
 import (
+       "fmt"
        "sync/atomic"
 
        "github.com/apache/arrow/go/arrow"
@@ -28,16 +29,14 @@ import (
 type FixedSizeBinaryBuilder struct {
        builder
 
-       dtype   *arrow.FixedSizeBinaryType
-       offsets *int32BufferBuilder
-       values  *byteBufferBuilder
+       dtype  *arrow.FixedSizeBinaryType
+       values *byteBufferBuilder
 }
 
 func NewFixedSizeBinaryBuilder(mem memory.Allocator, dtype 
*arrow.FixedSizeBinaryType) *FixedSizeBinaryBuilder {
        b := &FixedSizeBinaryBuilder{
                builder: builder{refCount: 1, mem: mem},
                dtype:   dtype,
-               offsets: newInt32BufferBuilder(mem),
                values:  newByteBufferBuilder(mem),
        }
        return b
@@ -54,10 +53,6 @@ func (b *FixedSizeBinaryBuilder) Release() {
                        b.nullBitmap.Release()
                        b.nullBitmap = nil
                }
-               if b.offsets != nil {
-                       b.offsets.Release()
-                       b.offsets = nil
-               }
                if b.values != nil {
                        b.values.Release()
                        b.values = nil
@@ -72,14 +67,13 @@ func (b *FixedSizeBinaryBuilder) Append(v []byte) {
        }
 
        b.Reserve(1)
-       b.appendNextOffset()
        b.values.Append(v)
        b.UnsafeAppendBoolToBitmap(true)
 }
 
 func (b *FixedSizeBinaryBuilder) AppendNull() {
        b.Reserve(1)
-       b.appendNextOffset()
+       b.values.Advance(b.dtype.ByteWidth)
        b.UnsafeAppendBoolToBitmap(false)
 }
 
@@ -97,25 +91,19 @@ func (b *FixedSizeBinaryBuilder) AppendValues(v [][]byte, 
valid []bool) {
 
        b.Reserve(len(v))
        for _, vv := range v {
-               b.appendNextOffset()
-               b.values.Append(vv)
+               switch len(vv) {
+               case 0:
+                       b.values.Advance(b.dtype.ByteWidth)
+               case b.dtype.ByteWidth:
+                       b.values.Append(vv)
+               default:
+                       panic(fmt.Errorf("array: invalid binary length (got=%d, 
want=%d)", len(vv), b.dtype.ByteWidth))
+               }
        }
 
        b.builder.unsafeAppendBoolsToBitmap(valid, len(v))
 }
 
-func (b *FixedSizeBinaryBuilder) Value(i int) []byte {
-       offsets := b.offsets.Values()
-       start := int(offsets[i])
-       var end int
-       if i == (b.length - 1) {
-               end = b.values.Len()
-       } else {
-               end = int(offsets[i+1])
-       }
-       return b.values.Bytes()[start:end]
-}
-
 func (b *FixedSizeBinaryBuilder) init(capacity int) {
        b.builder.init(capacity)
        b.values.resize(capacity * b.dtype.ByteWidth)
@@ -130,7 +118,6 @@ func (b *FixedSizeBinaryBuilder) Reserve(n int) {
 // Resize adjusts the space allocated by b to n elements. If n is greater than 
b.Cap(),
 // additional memory will be allocated. If n is smaller, the allocated memory 
may reduced.
 func (b *FixedSizeBinaryBuilder) Resize(n int) {
-       b.offsets.resize((n + 1) * arrow.Int32SizeBytes)
        b.builder.resize(n, b.init)
 }
 
@@ -150,29 +137,18 @@ func (b *FixedSizeBinaryBuilder) 
NewFixedSizeBinaryArray() (a *FixedSizeBinary)
 }
 
 func (b *FixedSizeBinaryBuilder) newData() (data *Data) {
-       b.appendNextOffset()
        values := b.values.Finish()
-       offsets := b.offsets.Finish()
-       data = NewData(b.dtype, b.length, []*memory.Buffer{b.nullBitmap, 
offsets, values}, nil, b.nulls, 0)
+       data = NewData(b.dtype, b.length, []*memory.Buffer{b.nullBitmap, 
values}, nil, b.nulls, 0)
 
        if values != nil {
                values.Release()
        }
-       if offsets != nil {
-               offsets.Release()
-       }
 
        b.builder.reset()
 
        return
 }
 
-func (b *FixedSizeBinaryBuilder) appendNextOffset() {
-       numBytes := b.values.Len()
-       // TODO(alexandre): check binaryArrayMaximumCapacity?
-       b.offsets.AppendValue(int32(numBytes))
-}
-
 var (
        _ Builder = (*FixedSizeBinaryBuilder)(nil)
 )
diff --git a/go/arrow/array/fixedsize_binarybuilder_test.go 
b/go/arrow/array/fixedsize_binarybuilder_test.go
index f50e1b0..08740c5 100644
--- a/go/arrow/array/fixedsize_binarybuilder_test.go
+++ b/go/arrow/array/fixedsize_binarybuilder_test.go
@@ -39,11 +39,6 @@ func TestFixedSizeBinaryBuilder(t *testing.T) {
        assert.Equal(t, 4, b.Len(), "unexpected Len()")
        assert.Equal(t, 2, b.NullN(), "unexpected NullN()")
 
-       assert.Equal(t, b.Value(0), []byte("1234567"))
-       assert.Equal(t, b.Value(1), []byte{})
-       assert.Equal(t, b.Value(2), []byte("ABCDEFG"))
-       assert.Equal(t, b.Value(3), []byte{})
-
        values := [][]byte{
                []byte("7654321"),
                nil,
@@ -54,10 +49,6 @@ func TestFixedSizeBinaryBuilder(t *testing.T) {
        assert.Equal(t, 7, b.Len(), "unexpected Len()")
        assert.Equal(t, 3, b.NullN(), "unexpected NullN()")
 
-       assert.Equal(t, []byte("7654321"), b.Value(4))
-       assert.Equal(t, []byte{}, b.Value(5))
-       assert.Equal(t, []byte("AZERTYU"), b.Value(6))
-
        a := b.NewFixedSizeBinaryArray()
 
        // check state of builder after NewFixedSizeBinaryArray
diff --git a/go/arrow/internal/arrdata/arrdata.go 
b/go/arrow/internal/arrdata/arrdata.go
index 504ade6..b7daf6f 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -39,6 +39,7 @@ func init() {
        Records["strings"] = makeStringsRecords()
        Records["fixed_size_lists"] = makeFixedSizeListsRecords()
        Records["fixed_width_types"] = makeFixedWidthTypesRecords()
+       Records["fixed_size_binaries"] = makeFixedSizeBinariesRecords()
 
        for k := range Records {
                RecordNames = append(RecordNames, k)
@@ -398,6 +399,45 @@ func makeFixedWidthTypesRecords() []array.Record {
        return recs
 }
 
+type fsb3 string
+
+func makeFixedSizeBinariesRecords() []array.Record {
+       mem := memory.NewGoAllocator()
+       schema := arrow.NewSchema(
+               []arrow.Field{
+                       arrow.Field{Name: "fixed_size_binary_3", Type: 
&arrow.FixedSizeBinaryType{ByteWidth: 3}, Nullable: true},
+               }, nil,
+       )
+
+       mask := []bool{true, false, false, true, true}
+       chunks := [][]array.Interface{
+               []array.Interface{
+                       arrayOf(mem, []fsb3{"001", "002", "003", "004", "005"}, 
mask),
+               },
+               []array.Interface{
+                       arrayOf(mem, []fsb3{"011", "012", "013", "014", "015"}, 
mask),
+               },
+               []array.Interface{
+                       arrayOf(mem, []fsb3{"021", "022", "023", "024", "025"}, 
mask),
+               },
+       }
+
+       defer func() {
+               for _, chunk := range chunks {
+                       for _, col := range chunk {
+                               col.Release()
+                       }
+               }
+       }()
+
+       recs := make([]array.Record, len(chunks))
+       for i, chunk := range chunks {
+               recs[i] = array.NewRecord(schema, chunk, -1)
+       }
+
+       return recs
+}
+
 func arrayOf(mem memory.Allocator, a interface{}, valids []bool) 
array.Interface {
        if mem == nil {
                mem = memory.NewGoAllocator()
@@ -567,6 +607,16 @@ func arrayOf(mem memory.Allocator, a interface{}, valids 
[]bool) array.Interface
                bldr.AppendValues(a, valids)
                return bldr.NewArray()
 
+       case []fsb3:
+               bldr := array.NewFixedSizeBinaryBuilder(mem, 
&arrow.FixedSizeBinaryType{ByteWidth: 3})
+               defer bldr.Release()
+               vs := make([][]byte, len(a))
+               for i, v := range a {
+                       vs[i] = []byte(v)
+               }
+               bldr.AppendValues(vs, valids)
+               return bldr.NewArray()
+
        default:
                panic(fmt.Errorf("arrdata: invalid data slice type %T", a))
        }
diff --git a/go/arrow/ipc/cmd/arrow-cat/main_test.go 
b/go/arrow/ipc/cmd/arrow-cat/main_test.go
index 5a8f031..3f9c3e7 100644
--- a/go/arrow/ipc/cmd/arrow-cat/main_test.go
+++ b/go/arrow/ipc/cmd/arrow-cat/main_test.go
@@ -149,6 +149,16 @@ record 3...
   col[7] "date64s": [-22 (null) (null) 21 22]
 `,
                },
+               {
+                       name: "fixed_size_binaries",
+                       want: `record 1...
+  col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"]
+record 2...
+  col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"]
+record 3...
+  col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"]
+`,
+               },
        } {
                t.Run(tc.name, func(t *testing.T) {
                        mem := 
memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -450,6 +460,28 @@ record 3/3...
   col[7] "date64s": [-22 (null) (null) 21 22]
 `,
                },
+               {
+                       stream: true,
+                       name:   "fixed_size_binaries",
+                       want: `record 1...
+  col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"]
+record 2...
+  col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"]
+record 3...
+  col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"]
+`,
+               },
+               {
+                       name: "fixed_size_binaries",
+                       want: `version: V4
+record 1/3...
+  col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"]
+record 2/3...
+  col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"]
+record 3/3...
+  col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"]
+`,
+               },
        } {
                t.Run(fmt.Sprintf("%s-stream=%v", tc.name, tc.stream), func(t 
*testing.T) {
                        mem := 
memory.NewCheckedAllocator(memory.NewGoAllocator())
diff --git a/go/arrow/ipc/cmd/arrow-ls/main_test.go 
b/go/arrow/ipc/cmd/arrow-ls/main_test.go
index 64cd478..0488eae 100644
--- a/go/arrow/ipc/cmd/arrow-ls/main_test.go
+++ b/go/arrow/ipc/cmd/arrow-ls/main_test.go
@@ -102,6 +102,14 @@ records: 3
 records: 3
 `,
                },
+               {
+                       name: "fixed_size_binaries",
+                       want: `schema:
+  fields: 1
+    - fixed_size_binary_3: type=fixed_size_binary[3], nullable
+records: 3
+`,
+               },
        } {
                t.Run(tc.name, func(t *testing.T) {
                        mem := 
memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -239,6 +247,24 @@ schema:
 records: 4
 `,
                },
+               {
+                       stream: true,
+                       name:   "fixed_size_binaries",
+                       want: `schema:
+  fields: 1
+    - fixed_size_binary_3: type=fixed_size_binary[3], nullable
+records: 3
+`,
+               },
+               {
+                       name: "fixed_size_binaries",
+                       want: `version: V4
+schema:
+  fields: 1
+    - fixed_size_binary_3: type=fixed_size_binary[3], nullable
+records: 3
+`,
+               },
        } {
                t.Run(fmt.Sprintf("%s-stream=%v", tc.name, tc.stream), func(t 
*testing.T) {
                        mem := 
memory.NewCheckedAllocator(memory.NewGoAllocator())
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index b8f08cc..5638f8b 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -450,7 +450,7 @@ func (ctx *arrayLoaderContext) loadBinary(dt 
arrow.DataType) array.Interface {
 
 func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt 
*arrow.FixedSizeBinaryType) array.Interface {
        field, buffers := ctx.loadCommon(2)
-       buffers = append(buffers, nil, ctx.buffer())
+       buffers = append(buffers, ctx.buffer())
 
        data := array.NewData(dt, int(field.Length()), buffers, nil, 
int(field.NullCount()), 0)
        defer data.Release()
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index e252023..e5797ba 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -237,12 +237,19 @@ func (w *recordEncoder) visit(p *payload, arr 
array.Interface) error {
        case arrow.FixedWidthDataType:
                data := arr.Data()
                values := data.Buffers()[1]
-               typeWidth := dtype.BitWidth() / 8
-               minLength := paddedLength(int64(arr.Len())*int64(typeWidth), 
kArrowAlignment)
+               arrLen := int64(arr.Len())
+               typeWidth := int64(dtype.BitWidth() / 8)
+               minLength := paddedLength(arrLen*typeWidth, kArrowAlignment)
 
                switch {
                case needTruncate(int64(data.Offset()), values, minLength):
-                       panic("not implemented") // FIXME(sbinet) writer.cc:212
+                       // non-zero offset: slice the buffer
+                       offset := int64(data.Offset()) * typeWidth
+                       // send padding if available
+                       len := minI64(bitutil.CeilByte64(arrLen*typeWidth), 
int64(data.Len())-offset)
+                       data = array.NewSliceData(data, offset, offset+len)
+                       defer data.Release()
+                       values = data.Buffers()[1]
                default:
                        if values != nil {
                                values.Retain()
@@ -268,7 +275,9 @@ func (w *recordEncoder) visit(p *payload, arr 
array.Interface) error {
                case needTruncate(int64(data.Offset()), values, totalDataBytes):
                        panic("not implemented") // FIXME(sbinet) writer.cc:264
                default:
-                       values.Retain()
+                       if values != nil {
+                               values.Retain()
+                       }
                }
                p.body = append(p.body, voffsets)
                p.body = append(p.body, values)
@@ -291,7 +300,9 @@ func (w *recordEncoder) visit(p *payload, arr 
array.Interface) error {
                case needTruncate(int64(data.Offset()), values, totalDataBytes):
                        panic("not implemented") // FIXME(sbinet) writer.cc:264
                default:
-                       values.Retain()
+                       if values != nil {
+                               values.Retain()
+                       }
                }
                p.body = append(p.body, voffsets)
                p.body = append(p.body, values)
@@ -430,3 +441,10 @@ func needTruncate(offset int64, buf *memory.Buffer, 
minLength int64) bool {
        }
        return offset != 0 || minLength < int64(buf.Len())
 }
+
+func minI64(a, b int64) int64 {
+       if a < b {
+               return a
+       }
+       return b
+}

Reply via email to