This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 61f5ab0 ARROW-13529: [Go] Fixing too many releases in IPC writer
61f5ab0 is described below
commit 61f5ab01d0218e194045e117c3efe729581caf55
Author: Matthew Topol <[email protected]>
AuthorDate: Wed Sep 29 16:38:45 2021 -0400
ARROW-13529: [Go] Fixing too many releases in IPC writer
Closes #11270 from zeroshade/arrow-13529
Authored-by: Matthew Topol <[email protected]>
Signed-off-by: Matthew Topol <[email protected]>
---
go/arrow/array/string.go | 20 +++++++++++++-
go/arrow/array/string_test.go | 16 +++++++++++
go/arrow/ipc/writer.go | 44 +++++++++++++++++-------------
go/arrow/ipc/writer_test.go | 62 +++++++++++++++++++++++++++++++++++++++++++
4 files changed, 123 insertions(+), 19 deletions(-)
diff --git a/go/arrow/array/string.go b/go/arrow/array/string.go
index 42e87d8..c8e16ce 100644
--- a/go/arrow/array/string.go
+++ b/go/arrow/array/string.go
@@ -19,6 +19,7 @@ package array
import (
"fmt"
"math"
+ "reflect"
"strings"
"unsafe"
@@ -57,7 +58,24 @@ func (a *String) Value(i int) string {
}
// ValueOffset returns the offset of the value at index i.
-func (a *String) ValueOffset(i int) int { return int(a.offsets[i]) }
+func (a *String) ValueOffset(i int) int {
+ if i < 0 || i > a.array.data.length {
+ panic("arrow/array: index out of range")
+ }
+ return int(a.offsets[i+a.array.data.offset])
+}
+
+func (a *String) ValueBytes() (ret []byte) {
+ beg := a.array.data.offset
+ end := beg + a.array.data.length
+ data := a.values[a.offsets[beg]:a.offsets[end]]
+
+ s := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
+ s.Data = (*reflect.StringHeader)(unsafe.Pointer(&data)).Data
+ s.Len = len(data)
+ s.Cap = len(data)
+ return
+}
func (a *String) String() string {
o := new(strings.Builder)
diff --git a/go/arrow/array/string_test.go b/go/arrow/array/string_test.go
index 896ee9a..549fe99 100644
--- a/go/arrow/array/string_test.go
+++ b/go/arrow/array/string_test.go
@@ -17,6 +17,7 @@
package array_test
import (
+ "bytes"
"testing"
"github.com/apache/arrow/go/arrow"
@@ -103,6 +104,11 @@ func TestStringArray(t *testing.T) {
if got, want := arr.String(), `["hello" "世界" (null) "bye"]`; got !=
want {
t.Fatalf("got=%q, want=%q", got, want)
}
+
+ if !bytes.Equal([]byte(`hello世界bye`), arr.ValueBytes()) {
+ t.Fatalf("got=%q, want=%q", string(arr.ValueBytes()),
`hello世界bye`)
+ }
+
slice := array.NewSliceData(arr.Data(), 2, 4)
defer slice.Release()
@@ -117,6 +123,16 @@ func TestStringArray(t *testing.T) {
if got, want := v.String(), `[(null) "bye"]`; got != want {
t.Fatalf("got=%q, want=%q", got, want)
}
+
+ if !bytes.Equal(v.ValueBytes(), []byte("bye")) {
+ t.Fatalf("got=%q, want=%q", string(v.ValueBytes()), "bye")
+ }
+
+ for i := 0; i < v.Len(); i++ {
+ if got, want := v.ValueOffset(0), offsets[i+slice.Offset()];
got != want {
+ t.Fatalf("val-offset-with-offset[%d]: got=%q, want=%q",
i, got, want)
+ }
+ }
}
func TestStringBuilder_Empty(t *testing.T) {
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index e9c43bb..020601f 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -374,12 +374,11 @@ func (w *recordEncoder) visit(p *Payload, arr
array.Interface) error {
offset := int64(data.Offset()) * typeWidth
// send padding if available
len := minI64(bitutil.CeilByte64(arrLen*typeWidth),
int64(data.Len())-offset)
- data = array.NewSliceData(data, offset, offset+len)
- defer data.Release()
- values = data.Buffers()[1]
- }
- if values != nil {
- values.Retain()
+ values = memory.NewBufferBytes(values.Bytes()[offset :
offset+len])
+ default:
+ if values != nil {
+ values.Retain()
+ }
}
p.body = append(p.body, values)
@@ -402,11 +401,9 @@ func (w *recordEncoder) visit(p *Payload, arr
array.Interface) error {
// slice data buffer to include the range we need now.
var (
beg = int64(arr.ValueOffset(0))
- len = minI64(paddedLength(totalDataBytes,
kArrowAlignment), int64(data.Len())-beg)
+ len = minI64(paddedLength(totalDataBytes,
kArrowAlignment), int64(totalDataBytes))
)
- data = array.NewSliceData(data, beg, beg+len)
- defer data.Release()
- values = data.Buffers()[2]
+ values =
memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len])
default:
if values != nil {
values.Retain()
@@ -426,7 +423,7 @@ func (w *recordEncoder) visit(p *Payload, arr
array.Interface) error {
var totalDataBytes int64
if voffsets != nil {
- totalDataBytes = int64(arr.ValueOffset(arr.Len()) -
arr.ValueOffset(0))
+ totalDataBytes = int64(len(arr.ValueBytes()))
}
switch {
@@ -434,11 +431,9 @@ func (w *recordEncoder) visit(p *Payload, arr
array.Interface) error {
// slice data buffer to include the range we need now.
var (
beg = int64(arr.ValueOffset(0))
- len = minI64(paddedLength(totalDataBytes,
kArrowAlignment), int64(data.Len())-beg)
+ len = minI64(paddedLength(totalDataBytes,
kArrowAlignment), int64(totalDataBytes))
)
- data = array.NewSliceData(data, beg, beg+len)
- defer data.Release()
- values = data.Buffers()[2]
+ values =
memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len])
default:
if values != nil {
values.Retain()
@@ -563,14 +558,27 @@ func (w *recordEncoder) getZeroBasedValueOffsets(arr
array.Interface) (*memory.B
data := arr.Data()
voffsets := data.Buffers()[1]
if data.Offset() != 0 {
- // FIXME(sbinet): writer.cc:231
- panic(xerrors.Errorf("not implemented offset=%d",
data.Offset()))
+ // if we have a non-zero offset, then the value offsets do not
start at
+ // zero. we must a) create a new offsets array with shifted
offsets and
+ // b) slice the values array accordingly
+ shiftedOffsets := memory.NewResizableBuffer(w.mem)
+
shiftedOffsets.Resize(arrow.Int32Traits.BytesRequired(data.Len() + 1))
+
+ dest := arrow.Int32Traits.CastFromBytes(shiftedOffsets.Bytes())
+ offsets :=
arrow.Int32Traits.CastFromBytes(voffsets.Bytes())[data.Offset() : data.Len()+2]
+
+ startOffset := offsets[0]
+ for i, o := range offsets {
+ dest[i] = o - startOffset
+ }
+ voffsets = shiftedOffsets
+ } else {
+ voffsets.Retain()
}
if voffsets == nil || voffsets.Len() == 0 {
return nil, nil
}
- voffsets.Retain()
return voffsets, nil
}
diff --git a/go/arrow/ipc/writer_test.go b/go/arrow/ipc/writer_test.go
new file mode 100644
index 0000000..7bfcf45
--- /dev/null
+++ b/go/arrow/ipc/writer_test.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ipc_test
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/ipc"
+ "github.com/apache/arrow/go/arrow/memory"
+ "github.com/stretchr/testify/assert"
+)
+
+// reproducer from ARROW-13529
+func TestSliceAndWrite(t *testing.T) {
+ alloc := memory.NewGoAllocator()
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "s", Type: arrow.BinaryTypes.String},
+ }, nil)
+
+ b := array.NewRecordBuilder(alloc, schema)
+ defer b.Release()
+
+ b.Field(0).(*array.StringBuilder).AppendValues([]string{"foo", "bar",
"baz"}, nil)
+ rec := b.NewRecord()
+ defer rec.Release()
+
+ sliceAndWrite := func(rec array.Record, schema *arrow.Schema) {
+ slice := rec.NewSlice(1, 2)
+ defer slice.Release()
+
+ fmt.Println(slice.Columns()[0].(*array.String).Value(0))
+
+ var buf bytes.Buffer
+ w := ipc.NewWriter(&buf, ipc.WithSchema(schema))
+ w.Write(slice)
+ w.Close()
+ }
+
+ assert.NotPanics(t, func() {
+ for i := 0; i < 2; i++ {
+ sliceAndWrite(rec, schema)
+ }
+ })
+}